You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/25 00:44:16 UTC

[07/20] Rename tez-engine-api to tez-runtime-api and tez-engine is split into 2: - tez-engine-library for user-visible Input/Output/Processor implementations - tez-engine-internals for framework internals

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
new file mode 100644
index 0000000..8b19ce0
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
@@ -0,0 +1,125 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.broadcast.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
+
+public class FileBasedKVWriter implements KVWriter {
+
+  public static final int INDEX_RECORD_LENGTH = 24;
+
+  private final Configuration conf;
+  private int numRecords = 0;
+
+  @SuppressWarnings("rawtypes")
+  private Class keyClass;
+  @SuppressWarnings("rawtypes")
+  private Class valClass;
+  private CompressionCodec codec;
+  private FileSystem rfs;
+  private IFile.Writer writer;
+
+  private TezTaskOutput ouputFileManager;
+
+  // TODO NEWTEZ Define Counters
+  // Number of records
+  // Time waiting for a write to complete, if that's possible.
+  // Size of key-value pairs written.
+
+  public FileBasedKVWriter(TezOutputContext outputContext) throws IOException {
+    this.conf = TezUtils.createConfFromUserPayload(outputContext
+        .getUserPayload());
+    this.conf.setStrings(TezJobConfig.LOCAL_DIRS,
+        outputContext.getWorkDirs());
+
+    this.rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw();
+
+    // Setup serialization
+    keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf);
+    valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf);
+
+    // Setup compression
+    if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
+      Class<? extends CompressionCodec> codecClass = ConfigUtils
+          .getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class);
+      codec = ReflectionUtils.newInstance(codecClass, this.conf);
+    } else {
+      codec = null;
+    }
+
+    this.ouputFileManager = TezRuntimeUtils.instantiateTaskOutputManager(conf,
+        outputContext);
+
+    initWriter();
+  }
+
+  /**
+   * @return true if any output was generated. false otherwise
+   * @throws IOException
+   */
+  public boolean close() throws IOException {
+    this.writer.close();
+    TezIndexRecord rec = new TezIndexRecord(0, writer.getRawLength(),
+        writer.getCompressedLength());
+    TezSpillRecord sr = new TezSpillRecord(1);
+    sr.putIndex(rec, 0);
+
+    Path indexFile = ouputFileManager
+        .getOutputIndexFileForWrite(INDEX_RECORD_LENGTH);
+    sr.writeToFile(indexFile, conf);
+    return numRecords > 0;
+  }
+
+  @Override
+  public void write(Object key, Object value) throws IOException {
+    this.writer.append(key, value);
+    numRecords++;
+  }
+
+  public void initWriter() throws IOException {
+    Path outputFile = ouputFileManager.getOutputFileForWrite();
+
+    // TODO NEWTEZ Consider making the buffer size configurable. Also consider
+    // setting up an in-memory buffer which is occasionally flushed to disk so
+    // that the output does not block.
+
+    // TODO NEWTEZ maybe use appropriate counter
+    this.writer = new IFile.Writer(conf, rfs, outputFile, keyClass, valClass,
+        codec, null);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java
new file mode 100644
index 0000000..d1b7ced
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java
@@ -0,0 +1,148 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.common;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezJobConfig;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class ConfigUtils {
+
+  public static Class<? extends CompressionCodec> getIntermediateOutputCompressorClass(
+      Configuration conf, Class<DefaultCodec> defaultValue) {
+    Class<? extends CompressionCodec> codecClass = defaultValue;
+    String name = conf
+        .get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC);
+    if (name != null) {
+      try {
+        codecClass = conf.getClassByName(name).asSubclass(
+            CompressionCodec.class);
+      } catch (ClassNotFoundException e) {
+        throw new IllegalArgumentException("Compression codec " + name
+            + " was not found.", e);
+      }
+    }
+    return codecClass;
+  }
+  
+  public static Class<? extends CompressionCodec> getIntermediateInputCompressorClass(
+      Configuration conf, Class<DefaultCodec> defaultValue) {
+    Class<? extends CompressionCodec> codecClass = defaultValue;
+    String name = conf
+        .get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC);
+    if (name != null) {
+      try {
+        codecClass = conf.getClassByName(name).asSubclass(
+            CompressionCodec.class);
+      } catch (ClassNotFoundException e) {
+        throw new IllegalArgumentException("Compression codec " + name
+            + " was not found.", e);
+      }
+    }
+    return codecClass;
+  }
+
+
+  // TODO Move defaults over to a constants file.
+  
+  public static boolean shouldCompressIntermediateOutput(Configuration conf) {
+    return conf.getBoolean(
+        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, false);
+  }
+
+  public static boolean isIntermediateInputCompressed(Configuration conf) {
+    return conf.getBoolean(
+        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED, false);
+  }
+
+  public static <V> Class<V> getIntermediateOutputValueClass(Configuration conf) {
+    Class<V> retv = (Class<V>) conf.getClass(
+        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, null,
+        Object.class);
+    return retv;
+  }
+  
+  public static <V> Class<V> getIntermediateInputValueClass(Configuration conf) {
+    Class<V> retv = (Class<V>) conf.getClass(
+        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, null,
+        Object.class);
+    return retv;
+  }
+
+  public static <K> Class<K> getIntermediateOutputKeyClass(Configuration conf) {
+    Class<K> retv = (Class<K>) conf.getClass(
+        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, null,
+        Object.class);
+    return retv;
+  }
+
+  public static <K> Class<K> getIntermediateInputKeyClass(Configuration conf) {
+    Class<K> retv = (Class<K>) conf.getClass(
+        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, null,
+        Object.class);
+    return retv;
+  }
+
+  public static <K> RawComparator<K> getIntermediateOutputKeyComparator(Configuration conf) {
+    Class<? extends RawComparator> theClass = conf.getClass(
+        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, null,
+        RawComparator.class);
+    if (theClass != null)
+      return ReflectionUtils.newInstance(theClass, conf);
+    return WritableComparator.get(getIntermediateOutputKeyClass(conf).asSubclass(
+        WritableComparable.class));
+  }
+
+  public static <K> RawComparator<K> getIntermediateInputKeyComparator(Configuration conf) {
+    Class<? extends RawComparator> theClass = conf.getClass(
+        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, null,
+        RawComparator.class);
+    if (theClass != null)
+      return ReflectionUtils.newInstance(theClass, conf);
+    return WritableComparator.get(getIntermediateInputKeyClass(conf).asSubclass(
+        WritableComparable.class));
+  }
+
+  
+  
+  // TODO Fix name
+  public static <V> RawComparator<V> getInputKeySecondaryGroupingComparator(
+      Configuration conf) {
+    Class<? extends RawComparator> theClass = conf
+        .getClass(
+            TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS,
+            null, RawComparator.class);
+    if (theClass == null) {
+      return getIntermediateInputKeyComparator(conf);
+    }
+
+    return ReflectionUtils.newInstance(theClass, conf);
+  }
+  
+  public static boolean useNewApi(Configuration conf) {
+    return conf.getBoolean("mapred.mapper.new-api", false);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
new file mode 100644
index 0000000..33cd0f6
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.runtime.library.common;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+
+public class Constants {
+
+  // TODO NEWTEZ Check which of these constants are expecting specific pieces of information which are being removed - like taskAttemptId
+  
+  public static final String TEZ = "tez";
+
+  public static final String MAP_OUTPUT_FILENAME_STRING = "file.out";
+  public static final String MAP_OUTPUT_INDEX_SUFFIX_STRING = ".index";
+  public static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out";
+
+  public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
+  public static String MERGED_OUTPUT_PREFIX = ".merged";
+  
+  // TODO NEWTEZ Remove this constant once the old code is removed.
+  public static final String TEZ_RUNTIME_TASK_ATTEMPT_ID = 
+      "tez.runtime.task.attempt.id";
+
+  public static final String TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING =
+      "file.out";
+
+  public static final String TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING =
+      ".index";
+
+  public static final String TEZ_RUNTIME_TASK_INPUT_FILE_FORMAT_STRING =
+      "%s/task_%d.out"; 
+
+  public static final String TEZ_RUNTIME_JOB_CREDENTIALS =
+      "tez.runtime.job.credentials";
+  
+  @Private
+  public static final String TEZ_RUNTIME_TASK_MEMORY =
+      "tez.runtime.task.memory";
+  
+  public static final String TEZ_RUNTIME_TASK_OUTPUT_DIR = "output";
+  
+  public static final String TEZ_RUNTIME_TASK_OUTPUT_MANAGER = 
+      "tez.runtime.task.local.output.manager";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
new file mode 100644
index 0000000..a13f3f1
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+/**
+ * Container for a task number and an attempt number for the task.
+ */
+@Private
+public class InputAttemptIdentifier {
+
+  private final InputIdentifier inputIdentifier;
+  private final int attemptNumber;
+  private String pathComponent;
+  
+  public InputAttemptIdentifier(int taskIndex, int attemptNumber) {
+    this(new InputIdentifier(taskIndex), attemptNumber, null);
+  }
+  
+  public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent) {
+    this.inputIdentifier = inputIdentifier;
+    this.attemptNumber = attemptNumber;
+    this.pathComponent = pathComponent;
+  }
+  
+  public InputAttemptIdentifier(int taskIndex, int attemptNumber, String pathComponent) {
+    this(new InputIdentifier(taskIndex), attemptNumber, pathComponent);
+  }
+
+  public InputIdentifier getInputIdentifier() {
+    return this.inputIdentifier;
+  }
+
+  public int getAttemptNumber() {
+    return attemptNumber;
+  }
+  
+  public String getPathComponent() {
+    return pathComponent;
+  }
+
+  // PathComponent does not need to be part of the hashCode and equals computation.
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + attemptNumber;
+    result = prime * result
+        + ((inputIdentifier == null) ? 0 : inputIdentifier.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    InputAttemptIdentifier other = (InputAttemptIdentifier) obj;
+    if (attemptNumber != other.attemptNumber)
+      return false;
+    if (inputIdentifier == null) {
+      if (other.inputIdentifier != null)
+        return false;
+    } else if (!inputIdentifier.equals(other.inputIdentifier))
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "InputAttemptIdentifier [inputIdentifier=" + inputIdentifier
+        + ", attemptNumber=" + attemptNumber + ", pathComponent="
+        + pathComponent + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputIdentifier.java
new file mode 100644
index 0000000..f4ce190
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputIdentifier.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common;
+
+public class InputIdentifier {
+
+  private final int srcTaskIndex;
+  
+  public InputIdentifier(int srcTaskIndex) {
+    this.srcTaskIndex = srcTaskIndex;
+  }
+
+  public int getSrcTaskIndex() {
+    return this.srcTaskIndex;
+  }
+
+  @Override
+  public int hashCode() {
+    return srcTaskIndex;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    InputIdentifier other = (InputIdentifier) obj;
+    if (srcTaskIndex != other.srcTaskIndex)
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "InputIdentifier [srcTaskIndex=" + srcTaskIndex + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
new file mode 100644
index 0000000..2381780
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
@@ -0,0 +1,152 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.common;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.TezTaskContext;
+import org.apache.tez.runtime.library.api.Partitioner;
+import org.apache.tez.runtime.library.common.combine.Combiner;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
+
+public class TezRuntimeUtils {
+
+  private static final Log LOG = LogFactory
+      .getLog(TezRuntimeUtils.class);
+  
+  public static String getTaskIdentifier(String vertexName, int taskIndex) {
+    return String.format("%s_%06d", vertexName, taskIndex);
+  }
+
+  public static String getTaskAttemptIdentifier(int taskIndex,
+      int taskAttemptNumber) {
+    return String.format("%d_%d", taskIndex, taskAttemptNumber);
+  }
+
+  // TODO Maybe include a dag name in this.
+  public static String getTaskAttemptIdentifier(String vertexName,
+      int taskIndex, int taskAttemptNumber) {
+    return String.format("%s_%06d_%02d", vertexName, taskIndex,
+        taskAttemptNumber);
+  }
+
+  @SuppressWarnings("unchecked")
+  public static Combiner instantiateCombiner(Configuration conf, TezTaskContext taskContext) throws IOException {
+    Class<? extends Combiner> clazz;
+    String className = conf.get(TezJobConfig.TEZ_RUNTIME_COMBINER_CLASS);
+    if (className == null) {
+      LOG.info("No combiner specified via " + TezJobConfig.TEZ_RUNTIME_COMBINER_CLASS + ". Combiner will not be used");
+      return null;
+    }
+    LOG.info("Using Combiner class: " + className);
+    try {
+      clazz = (Class<? extends Combiner>) conf.getClassByName(className);
+    } catch (ClassNotFoundException e) {
+      throw new IOException("Unable to load combiner class: " + className);
+    }
+    
+    Combiner combiner = null;
+    
+      Constructor<? extends Combiner> ctor;
+      try {
+        ctor = clazz.getConstructor(TezTaskContext.class);
+        combiner = ctor.newInstance(taskContext);
+      } catch (SecurityException e) {
+        throw new IOException(e);
+      } catch (NoSuchMethodException e) {
+        throw new IOException(e);
+      } catch (IllegalArgumentException e) {
+        throw new IOException(e);
+      } catch (InstantiationException e) {
+        throw new IOException(e);
+      } catch (IllegalAccessException e) {
+        throw new IOException(e);
+      } catch (InvocationTargetException e) {
+        throw new IOException(e);
+      }
+      return combiner;
+  }
+  
+  @SuppressWarnings("unchecked")
+  public static Partitioner instantiatePartitioner(Configuration conf)
+      throws IOException {
+    Class<? extends Partitioner> clazz;
+    try {
+      clazz = (Class<? extends Partitioner>) conf
+          .getClassByName(conf.get(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS));
+    } catch (ClassNotFoundException e) {
+      throw new IOException("Unable to find Partitioner class in config", e);
+    }
+
+    LOG.info("Using partitioner class: " + clazz.getName());
+
+    Partitioner partitioner = null;
+
+    try {
+      Constructor<? extends Partitioner> ctorWithConf = clazz
+          .getConstructor(Configuration.class);
+      partitioner = ctorWithConf.newInstance(conf);
+    } catch (SecurityException e) {
+      throw new IOException(e);
+    } catch (NoSuchMethodException e) {
+      try {
+        // Try a 0 argument constructor.
+        partitioner = clazz.newInstance();
+      } catch (InstantiationException e1) {
+        throw new IOException(e1);
+      } catch (IllegalAccessException e1) {
+        throw new IOException(e1);
+      }
+    } catch (IllegalArgumentException e) {
+      throw new IOException(e);
+    } catch (InstantiationException e) {
+      throw new IOException(e);
+    } catch (IllegalAccessException e) {
+      throw new IOException(e);
+    } catch (InvocationTargetException e) {
+      throw new IOException(e);
+    }
+    return partitioner;
+  }
+  
+  public static TezTaskOutput instantiateTaskOutputManager(Configuration conf, TezOutputContext outputContext) {
+    Class<?> clazz = conf.getClass(Constants.TEZ_RUNTIME_TASK_OUTPUT_MANAGER,
+        TezTaskOutputFiles.class);
+    try {
+      Constructor<?> ctor = clazz.getConstructor(Configuration.class, String.class);
+      ctor.setAccessible(true);
+      TezTaskOutput instance = (TezTaskOutput) ctor.newInstance(conf, outputContext.getUniqueIdentifier());
+      return instance;
+    } catch (Exception e) {
+      throw new TezUncheckedException(
+          "Unable to instantiate configured TezOutputFileManager: "
+              + conf.get(Constants.TEZ_RUNTIME_TASK_OUTPUT_MANAGER,
+                  TezTaskOutputFiles.class.getName()), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
new file mode 100644
index 0000000..fef3356
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
@@ -0,0 +1,194 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.common;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Iterates values while keys match in sorted input.
+ * 
+ * This class is not thread safe. Accessing methods from multiple threads will
+ * lead to corrupt data.
+ * 
+ */
+public class ValuesIterator<KEY,VALUE> {
+  protected TezRawKeyValueIterator in; //input iterator
+  private KEY key;               // current key
+  private KEY nextKey;
+  private VALUE value;             // current value
+  //private boolean hasNext;                      // more w/ this key
+  private boolean more;                         // more in file
+  private RawComparator<KEY> comparator;
+  private Deserializer<KEY> keyDeserializer;
+  private Deserializer<VALUE> valDeserializer;
+  private DataInputBuffer keyIn = new DataInputBuffer();
+  private DataInputBuffer valueIn = new DataInputBuffer();
+  private TezCounter inputKeyCounter;
+  private TezCounter inputValueCounter;
+  
+  private int keyCtr = 0;
+  private boolean hasMoreValues; // For the current key.
+  private boolean isFirstRecord = true;
+  
+  public ValuesIterator (TezRawKeyValueIterator in, 
+                         RawComparator<KEY> comparator, 
+                         Class<KEY> keyClass,
+                         Class<VALUE> valClass, Configuration conf,
+                         TezCounter inputKeyCounter,
+                         TezCounter inputValueCounter)
+    throws IOException {
+    this.in = in;
+    this.comparator = comparator;
+    this.inputKeyCounter = inputKeyCounter;
+    this.inputValueCounter = inputValueCounter;
+    SerializationFactory serializationFactory = new SerializationFactory(conf);
+    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+    this.keyDeserializer.open(keyIn);
+    this.valDeserializer = serializationFactory.getDeserializer(valClass);
+    this.valDeserializer.open(this.valueIn);
+  }
+
+  TezRawKeyValueIterator getRawIterator() { return in; }
+
+  /**
+   * Move to the next K-Vs pair
+   * @return true if another pair exists, otherwise false.
+   * @throws IOException 
+   */
+  public boolean moveToNext() throws IOException {
+    if (isFirstRecord) {
+      readNextKey();
+      key = nextKey;
+      nextKey = null;
+      hasMoreValues = more;
+      isFirstRecord = false;
+    } else {
+      nextKey();
+    }
+    return more;
+  }
+  
+  /** The current key. */
+  public KEY getKey() { 
+    return key; 
+  }
+  
+  // TODO NEWTEZ Maybe add another method which returns an iterator instead of iterable
+  
+  public Iterable<VALUE> getValues() {
+    return new Iterable<VALUE>() {
+
+      @Override
+      public Iterator<VALUE> iterator() {
+        
+        return new Iterator<VALUE>() {
+
+          private final int keyNumber = keyCtr;
+          
+          @Override
+          public boolean hasNext() {
+            return hasMoreValues;
+          }
+
+          @Override
+          public VALUE next() {
+            if (!hasMoreValues) {
+              throw new NoSuchElementException("iterate past last value");
+            }
+            Preconditions
+                .checkState(
+                    keyNumber == keyCtr,
+                    "Cannot use values iterator on the previous K-V pair after moveToNext has been invoked to move to the next K-V pair");
+            
+            try {
+              readNextValue();
+              readNextKey();
+            } catch (IOException ie) {
+              throw new RuntimeException("problem advancing post rec#"+keyCtr, ie);
+            }
+            inputValueCounter.increment(1);
+            return value;
+          }
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException("Cannot remove elements");
+          }
+        };
+      }
+    };
+  }
+  
+  
+
+  /** Start processing next unique key. */
+  private void nextKey() throws IOException {
+    // read until we find a new key
+    while (hasMoreValues) { 
+      readNextKey();
+    }
+    if (more) {
+      inputKeyCounter.increment(1);
+      ++keyCtr;
+    }
+    
+    // move the next key to the current one
+    KEY tmpKey = key;
+    key = nextKey;
+    nextKey = tmpKey;
+    hasMoreValues = more;
+  }
+
+  /** 
+   * read the next key - which may be the same as the current key.
+   */
+  private void readNextKey() throws IOException {
+    more = in.next();
+    if (more) {      
+      DataInputBuffer nextKeyBytes = in.getKey();
+      keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength());
+      nextKey = keyDeserializer.deserialize(nextKey);
+      hasMoreValues = key != null && (comparator.compare(key, nextKey) == 0);
+    } else {
+      hasMoreValues = false;
+    }
+  }
+
+  /**
+   * Read the next value
+   * @throws IOException
+   */
+  private void readNextValue() throws IOException {
+    DataInputBuffer nextValueBytes = in.getValue();
+    valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), nextValueBytes.getLength());
+    value = valDeserializer.deserialize(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/YARNMaster.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/YARNMaster.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/YARNMaster.java
new file mode 100644
index 0000000..8709e05
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/YARNMaster.java
@@ -0,0 +1,57 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.common;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+@Private
+@Unstable
+public class YARNMaster {
+  
+  public enum State {
+    INITIALIZING, RUNNING;
+  }
+
+  public static String getMasterUserName(Configuration conf) {
+    return conf.get(YarnConfiguration.RM_PRINCIPAL);
+  }
+  
+  public static InetSocketAddress getMasterAddress(Configuration conf) {
+    return conf.getSocketAddr(
+        YarnConfiguration.RM_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_PORT);
+  }
+
+  public static String getMasterPrincipal(Configuration conf) 
+  throws IOException {
+    String masterHostname = getMasterAddress(conf).getHostName();
+    // get kerberos principal for use as delegation token renewer
+    return SecurityUtil.getServerPrincipal(
+        getMasterUserName(conf), masterHostname);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/combine/Combiner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/combine/Combiner.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/combine/Combiner.java
new file mode 100644
index 0000000..5b10590
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/combine/Combiner.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.combine;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
+
+/**
+ *<b>Combiner Initialization</b></p> The Combiner class is picked up
+ * using the TEZ_RUNTIME_COMBINER_CLASS attribute in {@link TezJobConfig}
+ * 
+ * 
+ * Partitioners need to provide a single argument ({@link TezTaskContext})
+ * constructor.
+ */
+@Unstable
+@LimitedPrivate("mapreduce")
+public interface Combiner {
+  public void combine(TezRawKeyValueIterator rawIter, Writer writer)
+      throws InterruptedException, IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java
new file mode 100644
index 0000000..b40df6f
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java
@@ -0,0 +1,120 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.common.localshuffle;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.runtime.library.common.task.local.output.TezLocalTaskOutputFiles;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
+
+@SuppressWarnings({"rawtypes"})
+public class LocalShuffle {
+
+  // TODO NEWTEZ This is broken.
+
+  private final TezInputContext inputContext;
+  private final Configuration conf;
+  private final int numInputs;
+
+  private final Class keyClass;
+  private final Class valClass;
+  private final RawComparator comparator;
+
+  private final FileSystem rfs;
+  private final int sortFactor;
+  
+  private final TezCounter spilledRecordsCounter;
+  private final CompressionCodec codec;
+  private final TezTaskOutput mapOutputFile;
+
+  public LocalShuffle(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
+    this.inputContext = inputContext;
+    this.conf = conf;
+    this.numInputs = numInputs;
+    
+    this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+    this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
+    this.comparator = ConfigUtils.getIntermediateInputKeyComparator(conf);
+    
+    this.sortFactor =
+        conf.getInt(
+            TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, 
+            TezJobConfig.DEFAULT_TEZ_RUNTIME_IO_SORT_FACTOR);
+    
+    this.rfs = FileSystem.getLocal(conf).getRaw();
+
+    this.spilledRecordsCounter = inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
+    
+ // compression
+    if (ConfigUtils.isIntermediateInputCompressed(conf)) {
+      Class<? extends CompressionCodec> codecClass =
+          ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
+      this.codec = ReflectionUtils.newInstance(codecClass, conf);
+    } else {
+      this.codec = null;
+    }
+    
+    // Always local
+    this.mapOutputFile = new TezLocalTaskOutputFiles(conf, inputContext.getUniqueIdentifier());
+  }
+ 
+  
+  public TezRawKeyValueIterator run() throws IOException {
+    // Copy is complete, obviously! 
+
+    
+    // Merge
+    return TezMerger.merge(conf, rfs, 
+        keyClass, valClass,
+        codec, 
+        getMapFiles(),
+        false, 
+        sortFactor,
+        new Path(inputContext.getUniqueIdentifier()), // TODO NEWTEZ This is likely broken 
+        comparator,
+        null, spilledRecordsCounter, null, null);
+  }
+  
+  private Path[] getMapFiles() 
+  throws IOException {
+    List<Path> fileList = new ArrayList<Path>();
+      // for local jobs
+      for(int i = 0; i < numInputs; ++i) {
+        //fileList.add(mapOutputFile.getInputFile(i));
+      }
+      
+    return fileList.toArray(new Path[0]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenIdentifier.java
new file mode 100644
index 0000000..4b916fa
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenIdentifier.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * The token identifier for job token
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class JobTokenIdentifier extends TokenIdentifier {
+  private Text jobid;
+  public final static Text KIND_NAME = new Text("mapreduce.job");
+  
+  /**
+   * Default constructor
+   */
+  public JobTokenIdentifier() {
+    this.jobid = new Text();
+  }
+
+  /**
+   * Create a job token identifier from a jobid
+   * @param jobid the jobid to use
+   */
+  public JobTokenIdentifier(Text jobid) {
+    this.jobid = jobid;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public Text getKind() {
+    return KIND_NAME;
+  }
+  
+  /** {@inheritDoc} */
+  @Override
+  public UserGroupInformation getUser() {
+    if (jobid == null || "".equals(jobid.toString())) {
+      return null;
+    }
+    return UserGroupInformation.createRemoteUser(jobid.toString());
+  }
+  
+  /**
+   * Get the jobid
+   * @return the jobid
+   */
+  public Text getJobId() {
+    return jobid;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    jobid.readFields(in);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    jobid.write(out);
+  }
+
+  @InterfaceAudience.Private
+  public static class Renewer extends Token.TrivialRenewer {
+    @Override
+    protected Text getKind() {
+      return KIND_NAME;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenSecretManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenSecretManager.java
new file mode 100644
index 0000000..a03ee94
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenSecretManager.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.security;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * SecretManager for job token. It can be used to cache generated job tokens.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class JobTokenSecretManager extends SecretManager<JobTokenIdentifier> {
+  private final SecretKey masterKey;
+  private final Map<String, SecretKey> currentJobTokens;
+
+  /**
+   * Convert the byte[] to a secret key
+   * @param key the byte[] to create the secret key from
+   * @return the secret key
+   */
+  public static SecretKey createSecretKey(byte[] key) {
+    return SecretManager.createSecretKey(key);
+  }
+  
+  /**
+   * Compute the HMAC hash of the message using the key
+   * @param msg the message to hash
+   * @param key the key to use
+   * @return the computed hash
+   */
+  public static byte[] computeHash(byte[] msg, SecretKey key) {
+    return createPassword(msg, key);
+  }
+  
+  /**
+   * Default constructor
+   */
+  public JobTokenSecretManager() {
+    this.masterKey = generateSecret();
+    this.currentJobTokens = new TreeMap<String, SecretKey>();
+  }
+  
+  /**
+   * Create a new password/secret for the given job token identifier.
+   * @param identifier the job token identifier
+   * @return token password/secret
+   */
+  @Override
+  public byte[] createPassword(JobTokenIdentifier identifier) {
+    byte[] result = createPassword(identifier.getBytes(), masterKey);
+    return result;
+  }
+
+  /**
+   * Add the job token of a job to cache
+   * @param jobId the job that owns the token
+   * @param token the job token
+   */
+  public void addTokenForJob(String jobId, Token<JobTokenIdentifier> token) {
+    SecretKey tokenSecret = createSecretKey(token.getPassword());
+    synchronized (currentJobTokens) {
+      currentJobTokens.put(jobId, tokenSecret);
+    }
+  }
+
+  /**
+   * Remove the cached job token of a job from cache
+   * @param jobId the job whose token is to be removed
+   */
+  public void removeTokenForJob(String jobId) {
+    synchronized (currentJobTokens) {
+      currentJobTokens.remove(jobId);
+    }
+  }
+  
+  /**
+   * Look up the token password/secret for the given jobId.
+   * @param jobId the jobId to look up
+   * @return token password/secret as SecretKey
+   * @throws InvalidToken
+   */
+  public SecretKey retrieveTokenSecret(String jobId) throws InvalidToken {
+    SecretKey tokenSecret = null;
+    synchronized (currentJobTokens) {
+      tokenSecret = currentJobTokens.get(jobId);
+    }
+    if (tokenSecret == null) {
+      throw new InvalidToken("Can't find job token for job " + jobId + " !!");
+    }
+    return tokenSecret;
+  }
+  
+  /**
+   * Look up the token password/secret for the given job token identifier.
+   * @param identifier the job token identifier to look up
+   * @return token password/secret as byte[]
+   * @throws InvalidToken
+   */
+  @Override
+  public byte[] retrievePassword(JobTokenIdentifier identifier)
+      throws InvalidToken {
+    return retrieveTokenSecret(identifier.getJobId().toString()).getEncoded();
+  }
+
+  /**
+   * Create an empty job token identifier
+   * @return a newly created empty job token identifier
+   */
+  @Override
+  public JobTokenIdentifier createIdentifier() {
+    return new JobTokenIdentifier();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenSelector.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenSelector.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenSelector.java
new file mode 100644
index 0000000..b8227ab
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenSelector.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.security;
+
+import java.util.Collection;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+
+/**
+ * Look through tokens to find the first job token that matches the service
+ * and return it.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class JobTokenSelector implements TokenSelector<JobTokenIdentifier> {
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Token<JobTokenIdentifier> selectToken(Text service,
+      Collection<Token<? extends TokenIdentifier>> tokens) {
+    if (service == null) {
+      return null;
+    }
+    for (Token<? extends TokenIdentifier> token : tokens) {
+      if (JobTokenIdentifier.KIND_NAME.equals(token.getKind())
+          && service.equals(token.getService())) {
+        return (Token<JobTokenIdentifier>) token;
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/Master.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/Master.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/Master.java
new file mode 100644
index 0000000..5b91e0f
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/Master.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.security;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+@Private
+@Unstable
+public class Master {
+
+  public enum State {
+    INITIALIZING, RUNNING;
+  }
+
+  public static String getMasterUserName(Configuration conf) {
+    return conf.get(YarnConfiguration.RM_PRINCIPAL);
+  }
+
+  public static InetSocketAddress getMasterAddress(Configuration conf) {
+    return conf
+        .getSocketAddr(YarnConfiguration.RM_ADDRESS,
+            YarnConfiguration.DEFAULT_RM_ADDRESS,
+            YarnConfiguration.DEFAULT_RM_PORT);
+  }
+
+  public static String getMasterPrincipal(Configuration conf)
+      throws IOException {
+    String masterHostname = getMasterAddress(conf).getHostName();
+    // get kerberos principal for use as delegation token renewer
+    return SecurityUtil.getServerPrincipal(getMasterUserName(conf),
+        masterHostname);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
new file mode 100644
index 0000000..b2d382c
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.security;
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URL;
+
+import javax.crypto.SecretKey;
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.WritableComparator;
+
+/**
+ * 
+ * utilities for generating kyes, hashes and verifying them for shuffle
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SecureShuffleUtils {
+  public static final String HTTP_HEADER_URL_HASH = "UrlHash";
+  public static final String HTTP_HEADER_REPLY_URL_HASH = "ReplyHash";
+  
+  /**
+   * Base64 encoded hash of msg
+   * @param msg
+   */
+  public static String generateHash(byte[] msg, SecretKey key) {
+    return new String(Base64.encodeBase64(generateByteHash(msg, key)));
+  }
+  
+  /**
+   * calculate hash of msg
+   * @param msg
+   * @return
+   */
+  private static byte[] generateByteHash(byte[] msg, SecretKey key) {
+    return JobTokenSecretManager.computeHash(msg, key);
+  }
+  
+  /**
+   * verify that hash equals to HMacHash(msg)
+   * @param newHash
+   * @return true if is the same
+   */
+  private static boolean verifyHash(byte[] hash, byte[] msg, SecretKey key) {
+    byte[] msg_hash = generateByteHash(msg, key);
+    return WritableComparator.compareBytes(msg_hash, 0, msg_hash.length, hash, 0, hash.length) == 0;
+  }
+  
+  /**
+   * Aux util to calculate hash of a String
+   * @param enc_str
+   * @param key
+   * @return Base64 encodedHash
+   * @throws IOException
+   */
+  public static String hashFromString(String enc_str, SecretKey key) 
+  throws IOException {
+    return generateHash(enc_str.getBytes(), key); 
+  }
+  
+  /**
+   * verify that base64Hash is same as HMacHash(msg)  
+   * @param base64Hash (Base64 encoded hash)
+   * @param msg
+   * @throws IOException if not the same
+   */
+  public static void verifyReply(String base64Hash, String msg, SecretKey key)
+  throws IOException {
+    byte[] hash = Base64.decodeBase64(base64Hash.getBytes());
+    
+    boolean res = verifyHash(hash, msg.getBytes(), key);
+    
+    if(res != true) {
+      throw new IOException("Verification of the hashReply failed");
+    }
+  }
+  
+  /**
+   * Shuffle specific utils - build string for encoding from URL
+   * @param url
+   * @return string for encoding
+   */
+  public static String buildMsgFrom(URL url) {
+    return buildMsgFrom(url.getPath(), url.getQuery(), url.getPort());
+  }
+  /**
+   * Shuffle specific utils - build string for encoding from URL
+   * @param request
+   * @return string for encoding
+   */
+  public static String buildMsgFrom(HttpServletRequest request ) {
+    return buildMsgFrom(request.getRequestURI(), request.getQueryString(),
+        request.getLocalPort());
+  }
+  /**
+   * Shuffle specific utils - build string for encoding from URL
+   * @param uri_path
+   * @param uri_query
+   * @return string for encoding
+   */
+  private static String buildMsgFrom(String uri_path, String uri_query, int port) {
+    return String.valueOf(port) + uri_path + "?" + uri_query;
+  }
+  
+  
+  /**
+   * byte array to Hex String
+   * @param ba
+   * @return string with HEX value of the key
+   */
+  public static String toHex(byte[] ba) {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintStream ps = new PrintStream(baos);
+    for(byte b: ba) {
+      ps.printf("%x", b);
+    }
+    return baos.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/TokenCache.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/TokenCache.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/TokenCache.java
new file mode 100644
index 0000000..33373b7
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/TokenCache.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.security;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.tez.common.TezJobConfig;
+
+
+/**
+ * This class provides user facing APIs for transferring secrets from
+ * the job client to the tasks.
+ * The secrets can be stored just before submission of jobs and read during
+ * the task execution.  
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class TokenCache {
+  
+  private static final Log LOG = LogFactory.getLog(TokenCache.class);
+
+  
+  /**
+   * auxiliary method to get user's secret keys..
+   * @param alias
+   * @return secret key from the storage
+   */
+  public static byte[] getSecretKey(Credentials credentials, Text alias) {
+    if(credentials == null)
+      return null;
+    return credentials.getSecretKey(alias);
+  }
+  
+  /**
+   * Convenience method to obtain delegation tokens from namenodes 
+   * corresponding to the paths passed.
+   * @param credentials
+   * @param ps array of paths
+   * @param conf configuration
+   * @throws IOException
+   */
+  public static void obtainTokensForNamenodes(Credentials credentials,
+      Path[] ps, Configuration conf) throws IOException {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+    obtainTokensForNamenodesInternal(credentials, ps, conf);
+  }
+
+  /**
+   * Remove jobtoken referrals which don't make sense in the context
+   * of the task execution.
+   *
+   * @param conf
+   */
+  public static void cleanUpTokenReferral(Configuration conf) {
+    conf.unset(TezJobConfig.DAG_CREDENTIALS_BINARY);
+  }
+
+  static void obtainTokensForNamenodesInternal(Credentials credentials,
+      Path[] ps, Configuration conf) throws IOException {
+    Set<FileSystem> fsSet = new HashSet<FileSystem>();
+    for(Path p: ps) {
+      fsSet.add(p.getFileSystem(conf));
+    }
+    for (FileSystem fs : fsSet) {
+      obtainTokensForNamenodesInternal(fs, credentials, conf);
+    }
+  }
+
+  /**
+   * get delegation token for a specific FS
+   * @param fs
+   * @param credentials
+   * @param p
+   * @param conf
+   * @throws IOException
+   */
+  static void obtainTokensForNamenodesInternal(FileSystem fs, 
+      Credentials credentials, Configuration conf) throws IOException {
+    String delegTokenRenewer = Master.getMasterPrincipal(conf);
+    if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+      throw new IOException(
+          "Can't get Master Kerberos principal for use as renewer");
+    }
+    mergeBinaryTokens(credentials, conf);
+
+    final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer,
+                                                     credentials);
+    if (tokens != null) {
+      for (Token<?> token : tokens) {
+        LOG.info("Got dt for " + fs.getUri() + "; "+token);
+      }
+    }
+  }
+
+  private static void mergeBinaryTokens(Credentials creds, Configuration conf) {
+    String binaryTokenFilename =
+        conf.get(TezJobConfig.DAG_CREDENTIALS_BINARY);
+    if (binaryTokenFilename != null) {
+      Credentials binary;
+      try {
+        binary = Credentials.readTokenStorageFile(
+            new Path("file:///" +  binaryTokenFilename), conf);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      // supplement existing tokens with the tokens in the binary file
+      creds.mergeAll(binary);
+    }
+  }
+  
+  /**
+   * file name used on HDFS for generated job token
+   */
+  @InterfaceAudience.Private
+  public static final String JOB_TOKEN_HDFS_FILE = "jobToken";
+
+  /**
+   * conf setting for job tokens cache file name
+   */
+  @InterfaceAudience.Private
+  public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
+  private static final Text JOB_TOKEN = new Text("JobToken");
+  private static final Text SHUFFLE_TOKEN = new Text("MapReduceShuffleToken");
+  
+  /**
+   * load job token from a file
+   * @param conf
+   * @throws IOException
+   */
+  @InterfaceAudience.Private
+  public static Credentials loadTokens(String jobTokenFile, Configuration conf) 
+  throws IOException {
+    Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
+
+    Credentials ts = Credentials.readTokenStorageFile(localJobTokenFile, conf);
+
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Task: Loaded jobTokenFile from: "+
+          localJobTokenFile.toUri().getPath() 
+          +"; num of sec keys  = " + ts.numberOfSecretKeys() +
+          " Number of tokens " +  ts.numberOfTokens());
+    }
+    return ts;
+  }
+  /**
+   * store job token
+   * @param t
+   */
+  @InterfaceAudience.Private
+  public static void setJobToken(Token<? extends TokenIdentifier> t, 
+      Credentials credentials) {
+    credentials.addToken(JOB_TOKEN, t);
+  }
+  /**
+   * 
+   * @return job token
+   */
+  @SuppressWarnings("unchecked")
+  @InterfaceAudience.Private
+  public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
+    return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
+  }
+
+  @InterfaceAudience.Private
+  public static void setShuffleSecretKey(byte[] key, Credentials credentials) {
+    credentials.addSecretKey(SHUFFLE_TOKEN, key);
+  }
+
+  @InterfaceAudience.Private
+  public static byte[] getShuffleSecretKey(Credentials credentials) {
+    return getSecretKey(credentials, SHUFFLE_TOKEN);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ExceptionReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ExceptionReporter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ExceptionReporter.java
new file mode 100644
index 0000000..a872ba1
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ExceptionReporter.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.shuffle.impl;
+
+/**
+ * An interface for reporting exceptions to other threads
+ */
+interface ExceptionReporter {
+  void reportException(Throwable t);
+}