You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/25 00:44:25 UTC

[16/20] Rename tez-engine-api to tez-runtime-api and tez-engine is split into 2: - tez-engine-library for user-visible Input/Output/Processor implementations - tez-engine-internals for framework internals

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java b/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java
deleted file mode 100644
index b7867aa..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Iterates values while keys match in sorted input.
- * 
- * This class is not thread safe. Accessing methods from multiple threads will
- * lead to corrupt data.
- * 
- */
-public class ValuesIterator<KEY,VALUE> {
-  protected TezRawKeyValueIterator in; //input iterator
-  private KEY key;               // current key
-  private KEY nextKey;
-  private VALUE value;             // current value
-  //private boolean hasNext;                      // more w/ this key
-  private boolean more;                         // more in file
-  private RawComparator<KEY> comparator;
-  private Deserializer<KEY> keyDeserializer;
-  private Deserializer<VALUE> valDeserializer;
-  private DataInputBuffer keyIn = new DataInputBuffer();
-  private DataInputBuffer valueIn = new DataInputBuffer();
-  private TezCounter inputKeyCounter;
-  private TezCounter inputValueCounter;
-  
-  private int keyCtr = 0;
-  private boolean hasMoreValues; // For the current key.
-  private boolean isFirstRecord = true;
-  
-  public ValuesIterator (TezRawKeyValueIterator in, 
-                         RawComparator<KEY> comparator, 
-                         Class<KEY> keyClass,
-                         Class<VALUE> valClass, Configuration conf,
-                         TezCounter inputKeyCounter,
-                         TezCounter inputValueCounter)
-    throws IOException {
-    this.in = in;
-    this.comparator = comparator;
-    this.inputKeyCounter = inputKeyCounter;
-    this.inputValueCounter = inputValueCounter;
-    SerializationFactory serializationFactory = new SerializationFactory(conf);
-    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
-    this.keyDeserializer.open(keyIn);
-    this.valDeserializer = serializationFactory.getDeserializer(valClass);
-    this.valDeserializer.open(this.valueIn);
-  }
-
-  TezRawKeyValueIterator getRawIterator() { return in; }
-
-  /**
-   * Move to the next K-Vs pair
-   * @return true if another pair exists, otherwise false.
-   * @throws IOException 
-   */
-  public boolean moveToNext() throws IOException {
-    if (isFirstRecord) {
-      readNextKey();
-      key = nextKey;
-      nextKey = null;
-      hasMoreValues = more;
-      isFirstRecord = false;
-    } else {
-      nextKey();
-    }
-    return more;
-  }
-  
-  /** The current key. */
-  public KEY getKey() { 
-    return key; 
-  }
-  
-  // TODO NEWTEZ Maybe add another method which returns an iterator instead of iterable
-  
-  public Iterable<VALUE> getValues() {
-    return new Iterable<VALUE>() {
-
-      @Override
-      public Iterator<VALUE> iterator() {
-        
-        return new Iterator<VALUE>() {
-
-          private final int keyNumber = keyCtr;
-          
-          @Override
-          public boolean hasNext() {
-            return hasMoreValues;
-          }
-
-          @Override
-          public VALUE next() {
-            if (!hasMoreValues) {
-              throw new NoSuchElementException("iterate past last value");
-            }
-            Preconditions
-                .checkState(
-                    keyNumber == keyCtr,
-                    "Cannot use values iterator on the previous K-V pair after moveToNext has been invoked to move to the next K-V pair");
-            
-            try {
-              readNextValue();
-              readNextKey();
-            } catch (IOException ie) {
-              throw new RuntimeException("problem advancing post rec#"+keyCtr, ie);
-            }
-            inputValueCounter.increment(1);
-            return value;
-          }
-
-          @Override
-          public void remove() {
-            throw new UnsupportedOperationException("Cannot remove elements");
-          }
-        };
-      }
-    };
-  }
-  
-  
-
-  /** Start processing next unique key. */
-  private void nextKey() throws IOException {
-    // read until we find a new key
-    while (hasMoreValues) { 
-      readNextKey();
-    }
-    if (more) {
-      inputKeyCounter.increment(1);
-      ++keyCtr;
-    }
-    
-    // move the next key to the current one
-    KEY tmpKey = key;
-    key = nextKey;
-    nextKey = tmpKey;
-    hasMoreValues = more;
-  }
-
-  /** 
-   * read the next key - which may be the same as the current key.
-   */
-  private void readNextKey() throws IOException {
-    more = in.next();
-    if (more) {      
-      DataInputBuffer nextKeyBytes = in.getKey();
-      keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength());
-      nextKey = keyDeserializer.deserialize(nextKey);
-      hasMoreValues = key != null && (comparator.compare(key, nextKey) == 0);
-    } else {
-      hasMoreValues = false;
-    }
-  }
-
-  /**
-   * Read the next value
-   * @throws IOException
-   */
-  private void readNextValue() throws IOException {
-    DataInputBuffer nextValueBytes = in.getValue();
-    valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), nextValueBytes.getLength());
-    value = valDeserializer.deserialize(value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/YARNMaster.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/YARNMaster.java b/tez-engine/src/main/java/org/apache/tez/engine/common/YARNMaster.java
deleted file mode 100644
index 48ad639..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/YARNMaster.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-@Private
-@Unstable
-public class YARNMaster {
-  
-  public enum State {
-    INITIALIZING, RUNNING;
-  }
-
-  public static String getMasterUserName(Configuration conf) {
-    return conf.get(YarnConfiguration.RM_PRINCIPAL);
-  }
-  
-  public static InetSocketAddress getMasterAddress(Configuration conf) {
-    return conf.getSocketAddr(
-        YarnConfiguration.RM_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_PORT);
-  }
-
-  public static String getMasterPrincipal(Configuration conf) 
-  throws IOException {
-    String masterHostname = getMasterAddress(conf).getHostName();
-    // get kerberos principal for use as delegation token renewer
-    return SecurityUtil.getServerPrincipal(
-        getMasterUserName(conf), masterHostname);
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/combine/Combiner.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/combine/Combiner.java b/tez-engine/src/main/java/org/apache/tez/engine/common/combine/Combiner.java
deleted file mode 100644
index b387b36..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/combine/Combiner.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.combine;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.common.sort.impl.IFile.Writer;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-
-/**
- *<b>Combiner Initialization</b></p> The Combiner class is picked up
- * using the TEZ_ENGINE_COMBINER_CLASS attribute in {@link TezJobConfig}
- * 
- * 
- * Partitioners need to provide a single argument ({@link TezTaskContext})
- * constructor.
- */
-@Unstable
-@LimitedPrivate("mapreduce")
-public interface Combiner {
-  public void combine(TezRawKeyValueIterator rawIter, Writer writer)
-      throws InterruptedException, IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java b/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
deleted file mode 100644
index 546151f..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common.localshuffle;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.sort.impl.TezMerger;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-
-@SuppressWarnings({"rawtypes"})
-public class LocalShuffle {
-
-  // TODO NEWTEZ This is broken.
-
-  private final TezInputContext inputContext;
-  private final Configuration conf;
-  private final int numInputs;
-
-  private final Class keyClass;
-  private final Class valClass;
-  private final RawComparator comparator;
-
-  private final FileSystem rfs;
-  private final int sortFactor;
-  
-  private final TezCounter spilledRecordsCounter;
-  private final CompressionCodec codec;
-  private final TezTaskOutput mapOutputFile;
-
-  public LocalShuffle(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
-    this.inputContext = inputContext;
-    this.conf = conf;
-    this.numInputs = numInputs;
-    
-    this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
-    this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
-    this.comparator = ConfigUtils.getIntermediateInputKeyComparator(conf);
-    
-    this.sortFactor =
-        conf.getInt(
-            TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR, 
-            TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR);
-    
-    this.rfs = FileSystem.getLocal(conf).getRaw();
-
-    this.spilledRecordsCounter = inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
-    
- // compression
-    if (ConfigUtils.isIntermediateInputCompressed(conf)) {
-      Class<? extends CompressionCodec> codecClass =
-          ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
-      this.codec = ReflectionUtils.newInstance(codecClass, conf);
-    } else {
-      this.codec = null;
-    }
-    
-    // Always local
-    this.mapOutputFile = new TezLocalTaskOutputFiles(conf, inputContext.getUniqueIdentifier());
-  }
- 
-  
-  public TezRawKeyValueIterator run() throws IOException {
-    // Copy is complete, obviously! 
-
-    
-    // Merge
-    return TezMerger.merge(conf, rfs, 
-        keyClass, valClass,
-        codec, 
-        getMapFiles(),
-        false, 
-        sortFactor,
-        new Path(inputContext.getUniqueIdentifier()), // TODO NEWTEZ This is likely broken 
-        comparator,
-        null, spilledRecordsCounter, null, null);
-  }
-  
-  private Path[] getMapFiles() 
-  throws IOException {
-    List<Path> fileList = new ArrayList<Path>();
-      // for local jobs
-      for(int i = 0; i < numInputs; ++i) {
-        //fileList.add(mapOutputFile.getInputFile(i));
-      }
-      
-    return fileList.toArray(new Path[0]);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryImpl.java
deleted file mode 100644
index 351e01c..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryImpl.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.objectregistry;
-
-import java.util.AbstractMap;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import com.google.inject.Singleton;
-
-@Singleton
-public class ObjectRegistryImpl implements ObjectRegistry {
-
-  private Map<String, Map.Entry<Object, ObjectLifeCycle>> objectCache =
-      new HashMap<String, Map.Entry<Object, ObjectLifeCycle>>();
-
-  @Override
-  public synchronized Object add(ObjectLifeCycle lifeCycle,
-      String key, Object value) {
-    Map.Entry<Object, ObjectLifeCycle> oldEntry =
-        objectCache.put(key,
-            new AbstractMap.SimpleImmutableEntry<Object, ObjectLifeCycle>(
-                value, lifeCycle));
-    return oldEntry != null ? oldEntry.getKey() : null;
-  }
-
-  @Override
-  public synchronized Object get(String key) {
-    Map.Entry<Object, ObjectLifeCycle> entry =
-        objectCache.get(key);
-    return entry != null ? entry.getKey() : null;
-  }
-
-  @Override
-  public synchronized boolean delete(String key) {
-    return (null != objectCache.remove(key));
-  }
-
-  public synchronized void clearCache(ObjectLifeCycle lifeCycle) {
-    for (Entry<String, Entry<Object, ObjectLifeCycle>> entry :
-      objectCache.entrySet()) {
-      if (entry.getValue().getValue().equals(lifeCycle)) {
-        objectCache.remove(entry.getKey());
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryModule.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryModule.java b/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryModule.java
deleted file mode 100644
index ab346fd..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryModule.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.objectregistry;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.inject.AbstractModule;
-
-public class ObjectRegistryModule extends AbstractModule {
-
-  private final ObjectRegistry objectRegistry;
-
-  public ObjectRegistryModule(ObjectRegistry objectRegistry) {
-    this.objectRegistry = objectRegistry;
-  }
-
-  @VisibleForTesting
-  public ObjectRegistryModule() {
-    objectRegistry = new ObjectRegistryImpl();
-  }
-
-  @Override
-  protected void configure() {
-    bind(ObjectRegistry.class).toInstance(this.objectRegistry);
-    requestStaticInjection(ObjectRegistryFactory.class);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenIdentifier.java b/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenIdentifier.java
deleted file mode 100644
index 827001b..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenIdentifier.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.security;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-
-/**
- * The token identifier for job token
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class JobTokenIdentifier extends TokenIdentifier {
-  private Text jobid;
-  public final static Text KIND_NAME = new Text("mapreduce.job");
-  
-  /**
-   * Default constructor
-   */
-  public JobTokenIdentifier() {
-    this.jobid = new Text();
-  }
-
-  /**
-   * Create a job token identifier from a jobid
-   * @param jobid the jobid to use
-   */
-  public JobTokenIdentifier(Text jobid) {
-    this.jobid = jobid;
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public Text getKind() {
-    return KIND_NAME;
-  }
-  
-  /** {@inheritDoc} */
-  @Override
-  public UserGroupInformation getUser() {
-    if (jobid == null || "".equals(jobid.toString())) {
-      return null;
-    }
-    return UserGroupInformation.createRemoteUser(jobid.toString());
-  }
-  
-  /**
-   * Get the jobid
-   * @return the jobid
-   */
-  public Text getJobId() {
-    return jobid;
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    jobid.readFields(in);
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void write(DataOutput out) throws IOException {
-    jobid.write(out);
-  }
-
-  @InterfaceAudience.Private
-  public static class Renewer extends Token.TrivialRenewer {
-    @Override
-    protected Text getKind() {
-      return KIND_NAME;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSecretManager.java b/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSecretManager.java
deleted file mode 100644
index d957b8b..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSecretManager.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.security;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-import javax.crypto.SecretKey;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.security.token.Token;
-
-/**
- * SecretManager for job token. It can be used to cache generated job tokens.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class JobTokenSecretManager extends SecretManager<JobTokenIdentifier> {
-  private final SecretKey masterKey;
-  private final Map<String, SecretKey> currentJobTokens;
-
-  /**
-   * Convert the byte[] to a secret key
-   * @param key the byte[] to create the secret key from
-   * @return the secret key
-   */
-  public static SecretKey createSecretKey(byte[] key) {
-    return SecretManager.createSecretKey(key);
-  }
-  
-  /**
-   * Compute the HMAC hash of the message using the key
-   * @param msg the message to hash
-   * @param key the key to use
-   * @return the computed hash
-   */
-  public static byte[] computeHash(byte[] msg, SecretKey key) {
-    return createPassword(msg, key);
-  }
-  
-  /**
-   * Default constructor
-   */
-  public JobTokenSecretManager() {
-    this.masterKey = generateSecret();
-    this.currentJobTokens = new TreeMap<String, SecretKey>();
-  }
-  
-  /**
-   * Create a new password/secret for the given job token identifier.
-   * @param identifier the job token identifier
-   * @return token password/secret
-   */
-  @Override
-  public byte[] createPassword(JobTokenIdentifier identifier) {
-    byte[] result = createPassword(identifier.getBytes(), masterKey);
-    return result;
-  }
-
-  /**
-   * Add the job token of a job to cache
-   * @param jobId the job that owns the token
-   * @param token the job token
-   */
-  public void addTokenForJob(String jobId, Token<JobTokenIdentifier> token) {
-    SecretKey tokenSecret = createSecretKey(token.getPassword());
-    synchronized (currentJobTokens) {
-      currentJobTokens.put(jobId, tokenSecret);
-    }
-  }
-
-  /**
-   * Remove the cached job token of a job from cache
-   * @param jobId the job whose token is to be removed
-   */
-  public void removeTokenForJob(String jobId) {
-    synchronized (currentJobTokens) {
-      currentJobTokens.remove(jobId);
-    }
-  }
-  
-  /**
-   * Look up the token password/secret for the given jobId.
-   * @param jobId the jobId to look up
-   * @return token password/secret as SecretKey
-   * @throws InvalidToken
-   */
-  public SecretKey retrieveTokenSecret(String jobId) throws InvalidToken {
-    SecretKey tokenSecret = null;
-    synchronized (currentJobTokens) {
-      tokenSecret = currentJobTokens.get(jobId);
-    }
-    if (tokenSecret == null) {
-      throw new InvalidToken("Can't find job token for job " + jobId + " !!");
-    }
-    return tokenSecret;
-  }
-  
-  /**
-   * Look up the token password/secret for the given job token identifier.
-   * @param identifier the job token identifier to look up
-   * @return token password/secret as byte[]
-   * @throws InvalidToken
-   */
-  @Override
-  public byte[] retrievePassword(JobTokenIdentifier identifier)
-      throws InvalidToken {
-    return retrieveTokenSecret(identifier.getJobId().toString()).getEncoded();
-  }
-
-  /**
-   * Create an empty job token identifier
-   * @return a newly created empty job token identifier
-   */
-  @Override
-  public JobTokenIdentifier createIdentifier() {
-    return new JobTokenIdentifier();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSelector.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSelector.java b/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSelector.java
deleted file mode 100644
index 080b9e2..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSelector.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.security;
-
-import java.util.Collection;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenSelector;
-
-/**
- * Look through tokens to find the first job token that matches the service
- * and return it.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class JobTokenSelector implements TokenSelector<JobTokenIdentifier> {
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public Token<JobTokenIdentifier> selectToken(Text service,
-      Collection<Token<? extends TokenIdentifier>> tokens) {
-    if (service == null) {
-      return null;
-    }
-    for (Token<? extends TokenIdentifier> token : tokens) {
-      if (JobTokenIdentifier.KIND_NAME.equals(token.getKind())
-          && service.equals(token.getService())) {
-        return (Token<JobTokenIdentifier>) token;
-      }
-    }
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/security/Master.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/security/Master.java b/tez-engine/src/main/java/org/apache/tez/engine/common/security/Master.java
deleted file mode 100644
index 20cad0a..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/security/Master.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.security;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-@Private
-@Unstable
-public class Master {
-
-  public enum State {
-    INITIALIZING, RUNNING;
-  }
-
-  public static String getMasterUserName(Configuration conf) {
-    return conf.get(YarnConfiguration.RM_PRINCIPAL);
-  }
-
-  public static InetSocketAddress getMasterAddress(Configuration conf) {
-    return conf
-        .getSocketAddr(YarnConfiguration.RM_ADDRESS,
-            YarnConfiguration.DEFAULT_RM_ADDRESS,
-            YarnConfiguration.DEFAULT_RM_PORT);
-  }
-
-  public static String getMasterPrincipal(Configuration conf)
-      throws IOException {
-    String masterHostname = getMasterAddress(conf).getHostName();
-    // get kerberos principal for use as delegation token renewer
-    return SecurityUtil.getServerPrincipal(getMasterUserName(conf),
-        masterHostname);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/security/SecureShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/security/SecureShuffleUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/security/SecureShuffleUtils.java
deleted file mode 100644
index 71d990a..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/security/SecureShuffleUtils.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.security;
-
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.net.URL;
-
-import javax.crypto.SecretKey;
-import javax.servlet.http.HttpServletRequest;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.WritableComparator;
-
-/**
- * 
- * utilities for generating kyes, hashes and verifying them for shuffle
- *
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class SecureShuffleUtils {
-  public static final String HTTP_HEADER_URL_HASH = "UrlHash";
-  public static final String HTTP_HEADER_REPLY_URL_HASH = "ReplyHash";
-  
-  /**
-   * Base64 encoded hash of msg
-   * @param msg
-   */
-  public static String generateHash(byte[] msg, SecretKey key) {
-    return new String(Base64.encodeBase64(generateByteHash(msg, key)));
-  }
-  
-  /**
-   * calculate hash of msg
-   * @param msg
-   * @return
-   */
-  private static byte[] generateByteHash(byte[] msg, SecretKey key) {
-    return JobTokenSecretManager.computeHash(msg, key);
-  }
-  
-  /**
-   * verify that hash equals to HMacHash(msg)
-   * @param newHash
-   * @return true if is the same
-   */
-  private static boolean verifyHash(byte[] hash, byte[] msg, SecretKey key) {
-    byte[] msg_hash = generateByteHash(msg, key);
-    return WritableComparator.compareBytes(msg_hash, 0, msg_hash.length, hash, 0, hash.length) == 0;
-  }
-  
-  /**
-   * Aux util to calculate hash of a String
-   * @param enc_str
-   * @param key
-   * @return Base64 encodedHash
-   * @throws IOException
-   */
-  public static String hashFromString(String enc_str, SecretKey key) 
-  throws IOException {
-    return generateHash(enc_str.getBytes(), key); 
-  }
-  
-  /**
-   * verify that base64Hash is same as HMacHash(msg)  
-   * @param base64Hash (Base64 encoded hash)
-   * @param msg
-   * @throws IOException if not the same
-   */
-  public static void verifyReply(String base64Hash, String msg, SecretKey key)
-  throws IOException {
-    byte[] hash = Base64.decodeBase64(base64Hash.getBytes());
-    
-    boolean res = verifyHash(hash, msg.getBytes(), key);
-    
-    if(res != true) {
-      throw new IOException("Verification of the hashReply failed");
-    }
-  }
-  
-  /**
-   * Shuffle specific utils - build string for encoding from URL
-   * @param url
-   * @return string for encoding
-   */
-  public static String buildMsgFrom(URL url) {
-    return buildMsgFrom(url.getPath(), url.getQuery(), url.getPort());
-  }
-  /**
-   * Shuffle specific utils - build string for encoding from URL
-   * @param request
-   * @return string for encoding
-   */
-  public static String buildMsgFrom(HttpServletRequest request ) {
-    return buildMsgFrom(request.getRequestURI(), request.getQueryString(),
-        request.getLocalPort());
-  }
-  /**
-   * Shuffle specific utils - build string for encoding from URL
-   * @param uri_path
-   * @param uri_query
-   * @return string for encoding
-   */
-  private static String buildMsgFrom(String uri_path, String uri_query, int port) {
-    return String.valueOf(port) + uri_path + "?" + uri_query;
-  }
-  
-  
-  /**
-   * byte array to Hex String
-   * @param ba
-   * @return string with HEX value of the key
-   */
-  public static String toHex(byte[] ba) {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    PrintStream ps = new PrintStream(baos);
-    for(byte b: ba) {
-      ps.printf("%x", b);
-    }
-    return baos.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/security/TokenCache.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/security/TokenCache.java b/tez-engine/src/main/java/org/apache/tez/engine/common/security/TokenCache.java
deleted file mode 100644
index 5c42d22..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/security/TokenCache.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.security;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.tez.common.TezJobConfig;
-
-
-/**
- * This class provides user facing APIs for transferring secrets from
- * the job client to the tasks.
- * The secrets can be stored just before submission of jobs and read during
- * the task execution.  
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class TokenCache {
-  
-  private static final Log LOG = LogFactory.getLog(TokenCache.class);
-
-  
-  /**
-   * auxiliary method to get user's secret keys..
-   * @param alias
-   * @return secret key from the storage
-   */
-  public static byte[] getSecretKey(Credentials credentials, Text alias) {
-    if(credentials == null)
-      return null;
-    return credentials.getSecretKey(alias);
-  }
-  
-  /**
-   * Convenience method to obtain delegation tokens from namenodes 
-   * corresponding to the paths passed.
-   * @param credentials
-   * @param ps array of paths
-   * @param conf configuration
-   * @throws IOException
-   */
-  public static void obtainTokensForNamenodes(Credentials credentials,
-      Path[] ps, Configuration conf) throws IOException {
-    if (!UserGroupInformation.isSecurityEnabled()) {
-      return;
-    }
-    obtainTokensForNamenodesInternal(credentials, ps, conf);
-  }
-
-  /**
-   * Remove jobtoken referrals which don't make sense in the context
-   * of the task execution.
-   *
-   * @param conf
-   */
-  public static void cleanUpTokenReferral(Configuration conf) {
-    conf.unset(TezJobConfig.DAG_CREDENTIALS_BINARY);
-  }
-
-  static void obtainTokensForNamenodesInternal(Credentials credentials,
-      Path[] ps, Configuration conf) throws IOException {
-    Set<FileSystem> fsSet = new HashSet<FileSystem>();
-    for(Path p: ps) {
-      fsSet.add(p.getFileSystem(conf));
-    }
-    for (FileSystem fs : fsSet) {
-      obtainTokensForNamenodesInternal(fs, credentials, conf);
-    }
-  }
-
-  /**
-   * get delegation token for a specific FS
-   * @param fs
-   * @param credentials
-   * @param p
-   * @param conf
-   * @throws IOException
-   */
-  static void obtainTokensForNamenodesInternal(FileSystem fs, 
-      Credentials credentials, Configuration conf) throws IOException {
-    String delegTokenRenewer = Master.getMasterPrincipal(conf);
-    if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
-      throw new IOException(
-          "Can't get Master Kerberos principal for use as renewer");
-    }
-    mergeBinaryTokens(credentials, conf);
-
-    final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer,
-                                                     credentials);
-    if (tokens != null) {
-      for (Token<?> token : tokens) {
-        LOG.info("Got dt for " + fs.getUri() + "; "+token);
-      }
-    }
-  }
-
-  private static void mergeBinaryTokens(Credentials creds, Configuration conf) {
-    String binaryTokenFilename =
-        conf.get(TezJobConfig.DAG_CREDENTIALS_BINARY);
-    if (binaryTokenFilename != null) {
-      Credentials binary;
-      try {
-        binary = Credentials.readTokenStorageFile(
-            new Path("file:///" +  binaryTokenFilename), conf);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-      // supplement existing tokens with the tokens in the binary file
-      creds.mergeAll(binary);
-    }
-  }
-  
-  /**
-   * file name used on HDFS for generated job token
-   */
-  @InterfaceAudience.Private
-  public static final String JOB_TOKEN_HDFS_FILE = "jobToken";
-
-  /**
-   * conf setting for job tokens cache file name
-   */
-  @InterfaceAudience.Private
-  public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
-  private static final Text JOB_TOKEN = new Text("JobToken");
-  private static final Text SHUFFLE_TOKEN = new Text("MapReduceShuffleToken");
-  
-  /**
-   * load job token from a file
-   * @param conf
-   * @throws IOException
-   */
-  @InterfaceAudience.Private
-  public static Credentials loadTokens(String jobTokenFile, Configuration conf) 
-  throws IOException {
-    Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
-
-    Credentials ts = Credentials.readTokenStorageFile(localJobTokenFile, conf);
-
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Task: Loaded jobTokenFile from: "+
-          localJobTokenFile.toUri().getPath() 
-          +"; num of sec keys  = " + ts.numberOfSecretKeys() +
-          " Number of tokens " +  ts.numberOfTokens());
-    }
-    return ts;
-  }
-  /**
-   * store job token
-   * @param t
-   */
-  @InterfaceAudience.Private
-  public static void setJobToken(Token<? extends TokenIdentifier> t, 
-      Credentials credentials) {
-    credentials.addToken(JOB_TOKEN, t);
-  }
-  /**
-   * 
-   * @return job token
-   */
-  @SuppressWarnings("unchecked")
-  @InterfaceAudience.Private
-  public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
-    return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
-  }
-
-  @InterfaceAudience.Private
-  public static void setShuffleSecretKey(byte[] key, Credentials credentials) {
-    credentials.addSecretKey(SHUFFLE_TOKEN, key);
-  }
-
-  @InterfaceAudience.Private
-  public static byte[] getShuffleSecretKey(Credentials credentials) {
-    return getSecretKey(credentials, SHUFFLE_TOKEN);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ExceptionReporter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ExceptionReporter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ExceptionReporter.java
deleted file mode 100644
index 89a5888..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ExceptionReporter.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-/**
- * An interface for reporting exceptions to other threads
- */
-interface ExceptionReporter {
-  void reportException(Throwable t);
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
deleted file mode 100644
index b48bb0b..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
+++ /dev/null
@@ -1,624 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLConnection;
-import java.security.GeneralSecurityException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import javax.crypto.SecretKey;
-import javax.net.ssl.HttpsURLConnection;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.security.SecureShuffleUtils;
-import org.apache.tez.engine.common.shuffle.impl.MapOutput.Type;
-import org.apache.tez.engine.common.sort.impl.IFileInputStream;
-
-import com.google.common.annotations.VisibleForTesting;
-
-class Fetcher extends Thread {
-  
-  private static final Log LOG = LogFactory.getLog(Fetcher.class);
-  
-  /** Basic/unit connection timeout (in milliseconds) */
-  private final static int UNIT_CONNECT_TIMEOUT = 60 * 1000;
-
-  private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
-                                    CONNECTION, WRONG_REDUCE}
-  
-  private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
-  private final TezCounter connectionErrs;
-  private final TezCounter ioErrs;
-  private final TezCounter wrongLengthErrs;
-  private final TezCounter badIdErrs;
-  private final TezCounter wrongMapErrs;
-  private final TezCounter wrongReduceErrs;
-  private final MergeManager merger;
-  private final ShuffleScheduler scheduler;
-  private final ShuffleClientMetrics metrics;
-  private final Shuffle shuffle;
-  private final int id;
-  private static int nextId = 0;
-  
-  private final int connectionTimeout;
-  private final int readTimeout;
-  
-  // Decompression of map-outputs
-  private final CompressionCodec codec;
-  private final Decompressor decompressor;
-  private final SecretKey jobTokenSecret;
-
-  private volatile boolean stopped = false;
-
-  private Configuration job;
-
-  private static boolean sslShuffle;
-  private static SSLFactory sslFactory;
-
-  public Fetcher(Configuration job, 
-      ShuffleScheduler scheduler, MergeManager merger,
-      ShuffleClientMetrics metrics,
-      Shuffle shuffle, SecretKey jobTokenSecret, TezInputContext inputContext) throws IOException {
-    this.job = job;
-    this.scheduler = scheduler;
-    this.merger = merger;
-    this.metrics = metrics;
-    this.shuffle = shuffle;
-    this.id = ++nextId;
-    this.jobTokenSecret = jobTokenSecret;
-    ioErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
-        ShuffleErrors.IO_ERROR.toString());
-    wrongLengthErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
-        ShuffleErrors.WRONG_LENGTH.toString());
-    badIdErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
-        ShuffleErrors.BAD_ID.toString());
-    wrongMapErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
-        ShuffleErrors.WRONG_MAP.toString());
-    connectionErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
-        ShuffleErrors.CONNECTION.toString());
-    wrongReduceErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
-        ShuffleErrors.WRONG_REDUCE.toString());
-
-    if (ConfigUtils.isIntermediateInputCompressed(job)) {
-      Class<? extends CompressionCodec> codecClass =
-          ConfigUtils.getIntermediateInputCompressorClass(job, DefaultCodec.class);
-      codec = ReflectionUtils.newInstance(codecClass, job);
-      decompressor = CodecPool.getDecompressor(codec);
-    } else {
-      codec = null;
-      decompressor = null;
-    }
-
-    this.connectionTimeout = 
-        job.getInt(TezJobConfig.TEZ_ENGINE_SHUFFLE_CONNECT_TIMEOUT,
-            TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_STALLED_COPY_TIMEOUT);
-    this.readTimeout = 
-        job.getInt(TezJobConfig.TEZ_ENGINE_SHUFFLE_READ_TIMEOUT, 
-            TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_READ_TIMEOUT);
-
-    setName("fetcher#" + id);
-    setDaemon(true);
-
-    synchronized (Fetcher.class) {
-      sslShuffle = job.getBoolean(TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_SSL,
-          TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_ENABLE_SSL);
-      if (sslShuffle && sslFactory == null) {
-        sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, job);
-        try {
-          sslFactory.init();
-        } catch (Exception ex) {
-          sslFactory.destroy();
-          throw new RuntimeException(ex);
-        }
-      }
-    }
-  }
-  
-  public void run() {
-    try {
-      while (!stopped && !Thread.currentThread().isInterrupted()) {
-        MapHost host = null;
-        try {
-          // If merge is on, block
-          merger.waitForInMemoryMerge();
-
-          // Get a host to shuffle from
-          host = scheduler.getHost();
-          metrics.threadBusy();
-
-          // Shuffle
-          copyFromHost(host);
-        } finally {
-          if (host != null) {
-            scheduler.freeHost(host);
-            metrics.threadFree();            
-          }
-        }
-      }
-    } catch (InterruptedException ie) {
-      return;
-    } catch (Throwable t) {
-      shuffle.reportException(t);
-    }
-  }
-
-  public void shutDown() throws InterruptedException {
-    this.stopped = true;
-    interrupt();
-    try {
-      join(5000);
-    } catch (InterruptedException ie) {
-      LOG.warn("Got interrupt while joining " + getName(), ie);
-    }
-    if (sslFactory != null) {
-      sslFactory.destroy();
-    }
-  }
-
-  @VisibleForTesting
-  protected HttpURLConnection openConnection(URL url) throws IOException {
-    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
-    if (sslShuffle) {
-      HttpsURLConnection httpsConn = (HttpsURLConnection) conn;
-      try {
-        httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());
-      } catch (GeneralSecurityException ex) {
-        throw new IOException(ex);
-      }
-      httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
-    }
-    return conn;
-  }
-  
-  /**
-   * The crux of the matter...
-   * 
-   * @param host {@link MapHost} from which we need to  
-   *              shuffle available map-outputs.
-   */
-  @VisibleForTesting
-  protected void copyFromHost(MapHost host) throws IOException {
-    // Get completed maps on 'host'
-    List<InputAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
-    
-    // Sanity check to catch hosts with only 'OBSOLETE' maps, 
-    // especially at the tail of large jobs
-    if (srcAttempts.size() == 0) {
-      return;
-    }
-    
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
-        + srcAttempts);
-    }
-    
-    // List of maps to be fetched yet
-    Set<InputAttemptIdentifier> remaining = new HashSet<InputAttemptIdentifier>(srcAttempts);
-    
-    // Construct the url and connect
-    DataInputStream input;
-    boolean connectSucceeded = false;
-    
-    try {
-      URL url = getMapOutputURL(host, srcAttempts);
-      HttpURLConnection connection = openConnection(url);
-      
-      // generate hash of the url
-      String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
-      String encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecret);
-      
-      // put url hash into http header
-      connection.addRequestProperty(
-          SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
-      // set the read timeout
-      connection.setReadTimeout(readTimeout);
-      // put shuffle version into http header
-      connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
-      connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
-      connect(connection, connectionTimeout);
-      connectSucceeded = true;
-      input = new DataInputStream(connection.getInputStream());
-
-      // Validate response code
-      int rc = connection.getResponseCode();
-      if (rc != HttpURLConnection.HTTP_OK) {
-        throw new IOException(
-            "Got invalid response code " + rc + " from " + url +
-            ": " + connection.getResponseMessage());
-      }
-      // get the shuffle version
-      if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
-          connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
-          || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
-              connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) {
-        throw new IOException("Incompatible shuffle response version");
-      }
-      // get the replyHash which is HMac of the encHash we sent to the server
-      String replyHash = connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
-      if(replyHash==null) {
-        throw new IOException("security validation of TT Map output failed");
-      }
-      LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash);
-      // verify that replyHash is HMac of encHash
-      SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
-      LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
-    } catch (IOException ie) {
-      ioErrs.increment(1);
-      LOG.warn("Failed to connect to " + host + " with " + remaining.size() + 
-               " map outputs", ie);
-
-      // If connect did not succeed, just mark all the maps as failed,
-      // indirectly penalizing the host
-      if (!connectSucceeded) {
-        for(InputAttemptIdentifier left: remaining) {
-          scheduler.copyFailed(left, host, connectSucceeded);
-        }
-      } else {
-        // If we got a read error at this stage, it implies there was a problem
-        // with the first map, typically lost map. So, penalize only that map
-        // and add the rest
-        InputAttemptIdentifier firstMap = srcAttempts.get(0);
-        scheduler.copyFailed(firstMap, host, connectSucceeded);
-      }
-      
-      // Add back all the remaining maps, WITHOUT marking them as failed
-      for(InputAttemptIdentifier left: remaining) {
-        // TODO Should the first one be skipped ?
-        scheduler.putBackKnownMapOutput(host, left);
-      }
-      
-      return;
-    }
-    
-    try {
-      // Loop through available map-outputs and fetch them
-      // On any error, faildTasks is not null and we exit
-      // after putting back the remaining maps to the 
-      // yet_to_be_fetched list and marking the failed tasks.
-      InputAttemptIdentifier[] failedTasks = null;
-      while (!remaining.isEmpty() && failedTasks == null) {
-        failedTasks = copyMapOutput(host, input, remaining);
-      }
-      
-      if(failedTasks != null && failedTasks.length > 0) {
-        LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
-        for(InputAttemptIdentifier left: failedTasks) {
-          scheduler.copyFailed(left, host, true);
-        }
-      }
-      
-      IOUtils.cleanup(LOG, input);
-      
-      // Sanity check
-      if (failedTasks == null && !remaining.isEmpty()) {
-        throw new IOException("server didn't return all expected map outputs: "
-            + remaining.size() + " left.");
-      }
-    } finally {
-      for (InputAttemptIdentifier left : remaining) {
-        scheduler.putBackKnownMapOutput(host, left);
-      }
-    }
-  }
-  
-  private static InputAttemptIdentifier[] EMPTY_ATTEMPT_ID_ARRAY = new InputAttemptIdentifier[0];
-  
-  private InputAttemptIdentifier[] copyMapOutput(MapHost host,
-                                DataInputStream input,
-                                Set<InputAttemptIdentifier> remaining) {
-    MapOutput mapOutput = null;
-    InputAttemptIdentifier srcAttemptId = null;
-    long decompressedLength = -1;
-    long compressedLength = -1;
-    
-    try {
-      long startTime = System.currentTimeMillis();
-      int forReduce = -1;
-      //Read the shuffle header
-      try {
-        ShuffleHeader header = new ShuffleHeader();
-        header.readFields(input);
-        String pathComponent = header.mapId;
-        srcAttemptId = scheduler.getIdentifierForPathComponent(pathComponent);
-        compressedLength = header.compressedLength;
-        decompressedLength = header.uncompressedLength;
-        forReduce = header.forReduce;
-      } catch (IllegalArgumentException e) {
-        badIdErrs.increment(1);
-        LOG.warn("Invalid map id ", e);
-        //Don't know which one was bad, so consider all of them as bad
-        return remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
-      }
-
- 
-      // Do some basic sanity verification
-      if (!verifySanity(compressedLength, decompressedLength, forReduce,
-          remaining, srcAttemptId)) {
-        return new InputAttemptIdentifier[] {srcAttemptId};
-      }
-      
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength + 
-            ", decomp len: " + decompressedLength);
-      }
-      
-      // Get the location for the map output - either in-memory or on-disk
-      mapOutput = merger.reserve(srcAttemptId, decompressedLength, id);
-      
-      // Check if we can shuffle *now* ...
-      if (mapOutput.getType() == Type.WAIT) {
-        LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
-        //Not an error but wait to process data.
-        return EMPTY_ATTEMPT_ID_ARRAY;
-      } 
-      
-      // Go!
-      LOG.info("fetcher#" + id + " about to shuffle output of map " + 
-               mapOutput.getAttemptIdentifier() + " decomp: " +
-               decompressedLength + " len: " + compressedLength + " to " +
-               mapOutput.getType());
-      if (mapOutput.getType() == Type.MEMORY) {
-        shuffleToMemory(host, mapOutput, input, 
-                        (int) decompressedLength, (int) compressedLength);
-      } else {
-        shuffleToDisk(host, mapOutput, input, compressedLength);
-      }
-      
-      // Inform the shuffle scheduler
-      long endTime = System.currentTimeMillis();
-      scheduler.copySucceeded(srcAttemptId, host, compressedLength, 
-                              endTime - startTime, mapOutput);
-      // Note successful shuffle
-      remaining.remove(srcAttemptId);
-      metrics.successFetch();
-      return null;
-    } catch (IOException ioe) {
-      ioErrs.increment(1);
-      if (srcAttemptId == null || mapOutput == null) {
-        LOG.info("fetcher#" + id + " failed to read map header" + 
-                 srcAttemptId + " decomp: " + 
-                 decompressedLength + ", " + compressedLength, ioe);
-        if(srcAttemptId == null) {
-          return remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
-        } else {
-          return new InputAttemptIdentifier[] {srcAttemptId};
-        }
-      }
-      
-      LOG.warn("Failed to shuffle output of " + srcAttemptId + 
-               " from " + host.getHostName(), ioe); 
-
-      // Inform the shuffle-scheduler
-      mapOutput.abort();
-      metrics.failedFetch();
-      return new InputAttemptIdentifier[] {srcAttemptId};
-    }
-
-  }
-  
-  /**
-   * Do some basic verification on the input received -- Being defensive
-   * @param compressedLength
-   * @param decompressedLength
-   * @param forReduce
-   * @param remaining
-   * @param mapId
-   * @return true/false, based on if the verification succeeded or not
-   */
-  private boolean verifySanity(long compressedLength, long decompressedLength,
-      int forReduce, Set<InputAttemptIdentifier> remaining, InputAttemptIdentifier srcAttemptId) {
-    if (compressedLength < 0 || decompressedLength < 0) {
-      wrongLengthErrs.increment(1);
-      LOG.warn(getName() + " invalid lengths in map output header: id: " +
-          srcAttemptId + " len: " + compressedLength + ", decomp len: " + 
-               decompressedLength);
-      return false;
-    }
-    
-    int reduceStartId = shuffle.getReduceStartId();
-    int reduceRange = shuffle.getReduceRange();
-    if (forReduce < reduceStartId || forReduce >= reduceStartId+reduceRange) {
-      wrongReduceErrs.increment(1);
-      LOG.warn(getName() + " data for the wrong reduce map: " +
-               srcAttemptId + " len: " + compressedLength + " decomp len: " +
-               decompressedLength + " for reduce " + forReduce);
-      return false;
-    }
-
-    // Sanity check
-    if (!remaining.contains(srcAttemptId)) {
-      wrongMapErrs.increment(1);
-      LOG.warn("Invalid map-output! Received output for " + srcAttemptId);
-      return false;
-    }
-    
-    return true;
-  }
-
-  /**
-   * Create the map-output-url. This will contain all the map ids
-   * separated by commas
-   * @param host
-   * @param maps
-   * @return
-   * @throws MalformedURLException
-   */
-  private URL getMapOutputURL(MapHost host, List<InputAttemptIdentifier> srcAttempts
-                              )  throws MalformedURLException {
-    // Get the base url
-    StringBuffer url = new StringBuffer(host.getBaseUrl());
-    
-    boolean first = true;
-    for (InputAttemptIdentifier mapId : srcAttempts) {
-      if (!first) {
-        url.append(",");
-      }
-      url.append(mapId.getPathComponent());
-      first = false;
-    }
-   
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("MapOutput URL for " + host + " -> " + url.toString());
-    }
-    return new URL(url.toString());
-  }
-  
-  /** 
-   * The connection establishment is attempted multiple times and is given up 
-   * only on the last failure. Instead of connecting with a timeout of 
-   * X, we try connecting with a timeout of x < X but multiple times. 
-   */
-  private void connect(URLConnection connection, int connectionTimeout)
-  throws IOException {
-    int unit = 0;
-    if (connectionTimeout < 0) {
-      throw new IOException("Invalid timeout "
-                            + "[timeout = " + connectionTimeout + " ms]");
-    } else if (connectionTimeout > 0) {
-      unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
-    }
-    // set the connect timeout to the unit-connect-timeout
-    connection.setConnectTimeout(unit);
-    while (true) {
-      try {
-        connection.connect();
-        break;
-      } catch (IOException ioe) {
-        // update the total remaining connect-timeout
-        connectionTimeout -= unit;
-
-        // throw an exception if we have waited for timeout amount of time
-        // note that the updated value if timeout is used here
-        if (connectionTimeout == 0) {
-          throw ioe;
-        }
-
-        // reset the connect timeout for the last try
-        if (connectionTimeout < unit) {
-          unit = connectionTimeout;
-          // reset the connect time out for the final connect
-          connection.setConnectTimeout(unit);
-        }
-      }
-    }
-  }
-
-  private void shuffleToMemory(MapHost host, MapOutput mapOutput, 
-                               InputStream input, 
-                               int decompressedLength, 
-                               int compressedLength) throws IOException {    
-    IFileInputStream checksumIn = 
-      new IFileInputStream(input, compressedLength, job);
-
-    input = checksumIn;       
-  
-    // Are map-outputs compressed?
-    if (codec != null) {
-      decompressor.reset();
-      input = codec.createInputStream(input, decompressor);
-    }
-  
-    // Copy map-output into an in-memory buffer
-    byte[] shuffleData = mapOutput.getMemory();
-    
-    try {
-      IOUtils.readFully(input, shuffleData, 0, shuffleData.length);
-      metrics.inputBytes(shuffleData.length);
-      LOG.info("Read " + shuffleData.length + " bytes from map-output for " +
-               mapOutput.getAttemptIdentifier());
-    } catch (IOException ioe) {      
-      // Close the streams
-      IOUtils.cleanup(LOG, input);
-
-      // Re-throw
-      throw ioe;
-    }
-
-  }
-  
-  private void shuffleToDisk(MapHost host, MapOutput mapOutput, 
-                             InputStream input, 
-                             long compressedLength) 
-  throws IOException {
-    // Copy data to local-disk
-    OutputStream output = mapOutput.getDisk();
-    long bytesLeft = compressedLength;
-    try {
-      final int BYTES_TO_READ = 64 * 1024;
-      byte[] buf = new byte[BYTES_TO_READ];
-      while (bytesLeft > 0) {
-        int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
-        if (n < 0) {
-          throw new IOException("read past end of stream reading " + 
-                                mapOutput.getAttemptIdentifier());
-        }
-        output.write(buf, 0, n);
-        bytesLeft -= n;
-        metrics.inputBytes(n);
-      }
-
-      LOG.info("Read " + (compressedLength - bytesLeft) + 
-               " bytes from map-output for " +
-               mapOutput.getAttemptIdentifier());
-
-      output.close();
-    } catch (IOException ioe) {
-      // Close the streams
-      IOUtils.cleanup(LOG, input, output);
-
-      // Re-throw
-      throw ioe;
-    }
-
-    // Sanity check
-    if (bytesLeft != 0) {
-      throw new IOException("Incomplete map output received for " +
-                            mapOutput.getAttemptIdentifier() + " from " +
-                            host.getHostName() + " (" + 
-                            bytesLeft + " bytes missing of " + 
-                            compressedLength + ")"
-      );
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
deleted file mode 100644
index 1beed44..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.sort.impl.IFile;
-import org.apache.tez.engine.common.sort.impl.IFile.Reader;
-
-/**
- * <code>IFile.InMemoryReader</code> to read map-outputs present in-memory.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class InMemoryReader extends Reader {
-  private final InputAttemptIdentifier taskAttemptId;
-  private final MergeManager merger;
-  DataInputBuffer memDataIn = new DataInputBuffer();
-  private int start;
-  private int length;
-  private int prevKeyPos;
-
-  public InMemoryReader(MergeManager merger, InputAttemptIdentifier taskAttemptId,
-                        byte[] data, int start, int length)
-  throws IOException {
-    super(null, null, length - start, null, null);
-    this.merger = merger;
-    this.taskAttemptId = taskAttemptId;
-
-    buffer = data;
-    bufferSize = (int)fileLength;
-    memDataIn.reset(buffer, start, length);
-    this.start = start;
-    this.length = length;
-  }
-
-  @Override
-  public void reset(int offset) {
-    memDataIn.reset(buffer, start + offset, length);
-    bytesRead = offset;
-    eof = false;
-  }
-
-  @Override
-  public long getPosition() throws IOException {
-    // InMemoryReader does not initialize streams like Reader, so in.getPos()
-    // would not work. Instead, return the number of uncompressed bytes read,
-    // which will be correct since in-memory data is not compressed.
-    return bytesRead;
-  }
-  
-  @Override
-  public long getLength() { 
-    return fileLength;
-  }
-  
-  private void dumpOnError() {
-    File dumpFile = new File("../output/" + taskAttemptId + ".dump");
-    System.err.println("Dumping corrupt map-output of " + taskAttemptId + 
-                       " to " + dumpFile.getAbsolutePath());
-    try {
-      FileOutputStream fos = new FileOutputStream(dumpFile);
-      fos.write(buffer, 0, bufferSize);
-      fos.close();
-    } catch (IOException ioe) {
-      System.err.println("Failed to dump map-output of " + taskAttemptId);
-    }
-  }
-  
-  public KeyState readRawKey(DataInputBuffer key) throws IOException {
-    try {
-      if (!positionToNextRecord(memDataIn)) {
-        return KeyState.NO_KEY;
-      }
-      // Setup the key
-      int pos = memDataIn.getPosition();
-      byte[] data = memDataIn.getData();      
-      if(currentKeyLength == IFile.RLE_MARKER) {
-        key.reset(data, prevKeyPos, prevKeyLength);
-        currentKeyLength = prevKeyLength;
-        return KeyState.SAME_KEY;
-      }      
-      key.reset(data, pos, currentKeyLength);
-      prevKeyPos = pos;
-      // Position for the next value
-      long skipped = memDataIn.skip(currentKeyLength);
-      if (skipped != currentKeyLength) {
-        throw new IOException("Rec# " + recNo + 
-            ": Failed to skip past key of length: " + 
-            currentKeyLength);
-      }
-
-      // Record the byte
-      bytesRead += currentKeyLength;
-      return KeyState.NEW_KEY;
-    } catch (IOException ioe) {
-      dumpOnError();
-      throw ioe;
-    }
-  }
-  
-  public void nextRawValue(DataInputBuffer value) throws IOException {
-    try {
-      int pos = memDataIn.getPosition();
-      byte[] data = memDataIn.getData();
-      value.reset(data, pos, currentValueLength);
-
-      // Position for the next record
-      long skipped = memDataIn.skip(currentValueLength);
-      if (skipped != currentValueLength) {
-        throw new IOException("Rec# " + recNo + 
-            ": Failed to skip past value of length: " + 
-            currentValueLength);
-      }
-      // Record the byte
-      bytesRead += currentValueLength;
-
-      ++recNo;
-    } catch (IOException ioe) {
-      dumpOnError();
-      throw ioe;
-    }
-  }
-    
-  public void close() {
-    // Release
-    dataIn = null;
-    buffer = null;
-      // Inform the MergeManager
-    if (merger != null) {
-      merger.unreserve(bufferSize);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryWriter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryWriter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryWriter.java
deleted file mode 100644
index b3ebb8b..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryWriter.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.BoundedByteArrayOutputStream;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.tez.engine.common.sort.impl.IFile;
-import org.apache.tez.engine.common.sort.impl.IFileOutputStream;
-import org.apache.tez.engine.common.sort.impl.IFile.Writer;
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class InMemoryWriter extends Writer {
-  private static final Log LOG = LogFactory.getLog(InMemoryWriter.class);
-
-  private DataOutputStream out;
-
-  public InMemoryWriter(BoundedByteArrayOutputStream arrayStream) {
-    super(null);
-    this.out =
-      new DataOutputStream(new IFileOutputStream(arrayStream));
-  }
-
-  public void append(Object key, Object value) throws IOException {
-    throw new UnsupportedOperationException
-    ("InMemoryWriter.append(K key, V value");
-  }
-
-  public void append(DataInputBuffer key, DataInputBuffer value)
-  throws IOException {
-    int keyLength = key.getLength() - key.getPosition();
-    if (keyLength < 0) {
-      throw new IOException("Negative key-length not allowed: " + keyLength +
-                            " for " + key);
-    }
-
-    boolean sameKey = (key == IFile.REPEAT_KEY);
-
-    int valueLength = value.getLength() - value.getPosition();
-    if (valueLength < 0) {
-      throw new IOException("Negative value-length not allowed: " +
-                            valueLength + " for " + value);
-    }
-
-    if(sameKey) {
-      WritableUtils.writeVInt(out, IFile.RLE_MARKER);
-      WritableUtils.writeVInt(out, valueLength);
-      out.write(value.getData(), value.getPosition(), valueLength);
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("InMemWriter.append" +
-            " key.data=" + key.getData() +
-            " key.pos=" + key.getPosition() +
-            " key.len=" +key.getLength() +
-            " val.data=" + value.getData() +
-            " val.pos=" + value.getPosition() +
-            " val.len=" + value.getLength());
-      }
-      WritableUtils.writeVInt(out, keyLength);
-      WritableUtils.writeVInt(out, valueLength);
-      out.write(key.getData(), key.getPosition(), keyLength);
-      out.write(value.getData(), value.getPosition(), valueLength);
-    }
-
-  }
-
-  public void close() throws IOException {
-    // Write EOF_MARKER for key/value length
-    WritableUtils.writeVInt(out, IFile.EOF_MARKER);
-    WritableUtils.writeVInt(out, IFile.EOF_MARKER);
-
-    // Close the stream
-    out.close();
-    out = null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
deleted file mode 100644
index 20ec472..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-
-@Private
-class MapHost {
-  
-  public static enum State {
-    IDLE,               // No map outputs available
-    BUSY,               // Map outputs are being fetched
-    PENDING,            // Known map outputs which need to be fetched
-    PENALIZED           // Host penalized due to shuffle failures
-  }
-  
-  private State state = State.IDLE;
-  private final String hostName;
-  private final int partitionId;
-  private final String baseUrl;
-  private final String identifier;
-  // Tracks attempt IDs
-  private List<InputAttemptIdentifier> maps = new ArrayList<InputAttemptIdentifier>();
-  
-  public MapHost(int partitionId, String hostName, String baseUrl) {
-    this.partitionId = partitionId;
-    this.hostName = hostName;
-    this.baseUrl = baseUrl;
-    this.identifier = createIdentifier(hostName, partitionId);
-  }
-  
-  public static String createIdentifier(String hostName, int partitionId) {
-    return hostName + ":" + Integer.toString(partitionId);
-  }
-  
-  public String getIdentifier() {
-    return identifier;
-  }
-  
-  public int getPartitionId() {
-    return partitionId;
-  }
-
-  public State getState() {
-    return state;
-  }
-
-  public String getHostName() {
-    return hostName;
-  }
-
-  public String getBaseUrl() {
-    return baseUrl;
-  }
-
-  public synchronized void addKnownMap(InputAttemptIdentifier srcAttempt) {
-    maps.add(srcAttempt);
-    if (state == State.IDLE) {
-      state = State.PENDING;
-    }
-  }
-
-  public synchronized List<InputAttemptIdentifier> getAndClearKnownMaps() {
-    List<InputAttemptIdentifier> currentKnownMaps = maps;
-    maps = new ArrayList<InputAttemptIdentifier>();
-    return currentKnownMaps;
-  }
-  
-  public synchronized void markBusy() {
-    state = State.BUSY;
-  }
-  
-  public synchronized void markPenalized() {
-    state = State.PENALIZED;
-  }
-  
-  public synchronized int getNumKnownMapOutputs() {
-    return maps.size();
-  }
-
-  /**
-   * Called when the node is done with its penalty or done copying.
-   * @return the host's new state
-   */
-  public synchronized State markAvailable() {
-    if (maps.isEmpty()) {
-      state = State.IDLE;
-    } else {
-      state = State.PENDING;
-    }
-    return state;
-  }
-  
-  @Override
-  public String toString() {
-    return hostName;
-  }
-  
-  /**
-   * Mark the host as penalized
-   */
-  public synchronized void penalize() {
-    state = State.PENALIZED;
-  }
-}