You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/25 00:44:16 UTC
[07/20] Rename tez-engine-api to tez-runtime-api and tez-engine is
split into 2: - tez-engine-library for user-visible Input/Output/Processor
implementations - tez-engine-internals for framework internals
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
new file mode 100644
index 0000000..8b19ce0
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
@@ -0,0 +1,125 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.broadcast.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
+
+public class FileBasedKVWriter implements KVWriter {
+
+ public static final int INDEX_RECORD_LENGTH = 24;
+
+ private final Configuration conf;
+ private int numRecords = 0;
+
+ @SuppressWarnings("rawtypes")
+ private Class keyClass;
+ @SuppressWarnings("rawtypes")
+ private Class valClass;
+ private CompressionCodec codec;
+ private FileSystem rfs;
+ private IFile.Writer writer;
+
+ private TezTaskOutput ouputFileManager;
+
+ // TODO NEWTEZ Define Counters
+ // Number of records
+ // Time waiting for a write to complete, if that's possible.
+ // Size of key-value pairs written.
+
+ public FileBasedKVWriter(TezOutputContext outputContext) throws IOException {
+ this.conf = TezUtils.createConfFromUserPayload(outputContext
+ .getUserPayload());
+ this.conf.setStrings(TezJobConfig.LOCAL_DIRS,
+ outputContext.getWorkDirs());
+
+ this.rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw();
+
+ // Setup serialization
+ keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf);
+ valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf);
+
+ // Setup compression
+ if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
+ Class<? extends CompressionCodec> codecClass = ConfigUtils
+ .getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class);
+ codec = ReflectionUtils.newInstance(codecClass, this.conf);
+ } else {
+ codec = null;
+ }
+
+ this.ouputFileManager = TezRuntimeUtils.instantiateTaskOutputManager(conf,
+ outputContext);
+
+ initWriter();
+ }
+
+ /**
+ * @return true if any output was generated. false otherwise
+ * @throws IOException
+ */
+ public boolean close() throws IOException {
+ this.writer.close();
+ TezIndexRecord rec = new TezIndexRecord(0, writer.getRawLength(),
+ writer.getCompressedLength());
+ TezSpillRecord sr = new TezSpillRecord(1);
+ sr.putIndex(rec, 0);
+
+ Path indexFile = ouputFileManager
+ .getOutputIndexFileForWrite(INDEX_RECORD_LENGTH);
+ sr.writeToFile(indexFile, conf);
+ return numRecords > 0;
+ }
+
+ @Override
+ public void write(Object key, Object value) throws IOException {
+ this.writer.append(key, value);
+ numRecords++;
+ }
+
+ public void initWriter() throws IOException {
+ Path outputFile = ouputFileManager.getOutputFileForWrite();
+
+ // TODO NEWTEZ Consider making the buffer size configurable. Also consider
+ // setting up an in-memory buffer which is occasionally flushed to disk so
+ // that the output does not block.
+
+ // TODO NEWTEZ maybe use appropriate counter
+ this.writer = new IFile.Writer(conf, rfs, outputFile, keyClass, valClass,
+ codec, null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java
new file mode 100644
index 0000000..d1b7ced
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java
@@ -0,0 +1,148 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.common;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezJobConfig;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class ConfigUtils {
+
+ public static Class<? extends CompressionCodec> getIntermediateOutputCompressorClass(
+ Configuration conf, Class<DefaultCodec> defaultValue) {
+ Class<? extends CompressionCodec> codecClass = defaultValue;
+ String name = conf
+ .get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC);
+ if (name != null) {
+ try {
+ codecClass = conf.getClassByName(name).asSubclass(
+ CompressionCodec.class);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("Compression codec " + name
+ + " was not found.", e);
+ }
+ }
+ return codecClass;
+ }
+
+ public static Class<? extends CompressionCodec> getIntermediateInputCompressorClass(
+ Configuration conf, Class<DefaultCodec> defaultValue) {
+ Class<? extends CompressionCodec> codecClass = defaultValue;
+ String name = conf
+ .get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC);
+ if (name != null) {
+ try {
+ codecClass = conf.getClassByName(name).asSubclass(
+ CompressionCodec.class);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("Compression codec " + name
+ + " was not found.", e);
+ }
+ }
+ return codecClass;
+ }
+
+
+ // TODO Move defaults over to a constants file.
+
+ public static boolean shouldCompressIntermediateOutput(Configuration conf) {
+ return conf.getBoolean(
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, false);
+ }
+
+ public static boolean isIntermediateInputCompressed(Configuration conf) {
+ return conf.getBoolean(
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED, false);
+ }
+
+ public static <V> Class<V> getIntermediateOutputValueClass(Configuration conf) {
+ Class<V> retv = (Class<V>) conf.getClass(
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, null,
+ Object.class);
+ return retv;
+ }
+
+ public static <V> Class<V> getIntermediateInputValueClass(Configuration conf) {
+ Class<V> retv = (Class<V>) conf.getClass(
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, null,
+ Object.class);
+ return retv;
+ }
+
+ public static <K> Class<K> getIntermediateOutputKeyClass(Configuration conf) {
+ Class<K> retv = (Class<K>) conf.getClass(
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, null,
+ Object.class);
+ return retv;
+ }
+
+ public static <K> Class<K> getIntermediateInputKeyClass(Configuration conf) {
+ Class<K> retv = (Class<K>) conf.getClass(
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, null,
+ Object.class);
+ return retv;
+ }
+
+ public static <K> RawComparator<K> getIntermediateOutputKeyComparator(Configuration conf) {
+ Class<? extends RawComparator> theClass = conf.getClass(
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, null,
+ RawComparator.class);
+ if (theClass != null)
+ return ReflectionUtils.newInstance(theClass, conf);
+ return WritableComparator.get(getIntermediateOutputKeyClass(conf).asSubclass(
+ WritableComparable.class));
+ }
+
+ public static <K> RawComparator<K> getIntermediateInputKeyComparator(Configuration conf) {
+ Class<? extends RawComparator> theClass = conf.getClass(
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, null,
+ RawComparator.class);
+ if (theClass != null)
+ return ReflectionUtils.newInstance(theClass, conf);
+ return WritableComparator.get(getIntermediateInputKeyClass(conf).asSubclass(
+ WritableComparable.class));
+ }
+
+
+
+ // TODO Fix name
+ public static <V> RawComparator<V> getInputKeySecondaryGroupingComparator(
+ Configuration conf) {
+ Class<? extends RawComparator> theClass = conf
+ .getClass(
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS,
+ null, RawComparator.class);
+ if (theClass == null) {
+ return getIntermediateInputKeyComparator(conf);
+ }
+
+ return ReflectionUtils.newInstance(theClass, conf);
+ }
+
+ public static boolean useNewApi(Configuration conf) {
+ return conf.getBoolean("mapred.mapper.new-api", false);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
new file mode 100644
index 0000000..33cd0f6
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.runtime.library.common;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+
+public class Constants {
+
+ // TODO NEWTEZ Check which of these constants are expecting specific pieces of information which are being removed - like taskAttemptId
+
+ public static final String TEZ = "tez";
+
+ public static final String MAP_OUTPUT_FILENAME_STRING = "file.out";
+ public static final String MAP_OUTPUT_INDEX_SUFFIX_STRING = ".index";
+ public static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out";
+
+ public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
+ public static String MERGED_OUTPUT_PREFIX = ".merged";
+
+ // TODO NEWTEZ Remove this constant once the old code is removed.
+ public static final String TEZ_RUNTIME_TASK_ATTEMPT_ID =
+ "tez.runtime.task.attempt.id";
+
+ public static final String TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING =
+ "file.out";
+
+ public static final String TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING =
+ ".index";
+
+ public static final String TEZ_RUNTIME_TASK_INPUT_FILE_FORMAT_STRING =
+ "%s/task_%d.out";
+
+ public static final String TEZ_RUNTIME_JOB_CREDENTIALS =
+ "tez.runtime.job.credentials";
+
+ @Private
+ public static final String TEZ_RUNTIME_TASK_MEMORY =
+ "tez.runtime.task.memory";
+
+ public static final String TEZ_RUNTIME_TASK_OUTPUT_DIR = "output";
+
+ public static final String TEZ_RUNTIME_TASK_OUTPUT_MANAGER =
+ "tez.runtime.task.local.output.manager";
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
new file mode 100644
index 0000000..a13f3f1
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+/**
+ * Container for a task number and an attempt number for the task.
+ */
+@Private
+public class InputAttemptIdentifier {
+
+ private final InputIdentifier inputIdentifier;
+ private final int attemptNumber;
+ private String pathComponent;
+
+ public InputAttemptIdentifier(int taskIndex, int attemptNumber) {
+ this(new InputIdentifier(taskIndex), attemptNumber, null);
+ }
+
+ public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent) {
+ this.inputIdentifier = inputIdentifier;
+ this.attemptNumber = attemptNumber;
+ this.pathComponent = pathComponent;
+ }
+
+ public InputAttemptIdentifier(int taskIndex, int attemptNumber, String pathComponent) {
+ this(new InputIdentifier(taskIndex), attemptNumber, pathComponent);
+ }
+
+ public InputIdentifier getInputIdentifier() {
+ return this.inputIdentifier;
+ }
+
+ public int getAttemptNumber() {
+ return attemptNumber;
+ }
+
+ public String getPathComponent() {
+ return pathComponent;
+ }
+
+ // PathComponent does not need to be part of the hashCode and equals computation.
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + attemptNumber;
+ result = prime * result
+ + ((inputIdentifier == null) ? 0 : inputIdentifier.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ InputAttemptIdentifier other = (InputAttemptIdentifier) obj;
+ if (attemptNumber != other.attemptNumber)
+ return false;
+ if (inputIdentifier == null) {
+ if (other.inputIdentifier != null)
+ return false;
+ } else if (!inputIdentifier.equals(other.inputIdentifier))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "InputAttemptIdentifier [inputIdentifier=" + inputIdentifier
+ + ", attemptNumber=" + attemptNumber + ", pathComponent="
+ + pathComponent + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputIdentifier.java
new file mode 100644
index 0000000..f4ce190
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputIdentifier.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common;
+
+public class InputIdentifier {
+
+ private final int srcTaskIndex;
+
+ public InputIdentifier(int srcTaskIndex) {
+ this.srcTaskIndex = srcTaskIndex;
+ }
+
+ public int getSrcTaskIndex() {
+ return this.srcTaskIndex;
+ }
+
+ @Override
+ public int hashCode() {
+ return srcTaskIndex;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ InputIdentifier other = (InputIdentifier) obj;
+ if (srcTaskIndex != other.srcTaskIndex)
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "InputIdentifier [srcTaskIndex=" + srcTaskIndex + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
new file mode 100644
index 0000000..2381780
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
@@ -0,0 +1,152 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.common;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.TezTaskContext;
+import org.apache.tez.runtime.library.api.Partitioner;
+import org.apache.tez.runtime.library.common.combine.Combiner;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
+
+public class TezRuntimeUtils {
+
+ private static final Log LOG = LogFactory
+ .getLog(TezRuntimeUtils.class);
+
+ public static String getTaskIdentifier(String vertexName, int taskIndex) {
+ return String.format("%s_%06d", vertexName, taskIndex);
+ }
+
+ public static String getTaskAttemptIdentifier(int taskIndex,
+ int taskAttemptNumber) {
+ return String.format("%d_%d", taskIndex, taskAttemptNumber);
+ }
+
+ // TODO Maybe include a dag name in this.
+ public static String getTaskAttemptIdentifier(String vertexName,
+ int taskIndex, int taskAttemptNumber) {
+ return String.format("%s_%06d_%02d", vertexName, taskIndex,
+ taskAttemptNumber);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Combiner instantiateCombiner(Configuration conf, TezTaskContext taskContext) throws IOException {
+ Class<? extends Combiner> clazz;
+ String className = conf.get(TezJobConfig.TEZ_RUNTIME_COMBINER_CLASS);
+ if (className == null) {
+ LOG.info("No combiner specified via " + TezJobConfig.TEZ_RUNTIME_COMBINER_CLASS + ". Combiner will not be used");
+ return null;
+ }
+ LOG.info("Using Combiner class: " + className);
+ try {
+ clazz = (Class<? extends Combiner>) conf.getClassByName(className);
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Unable to load combiner class: " + className);
+ }
+
+ Combiner combiner = null;
+
+ Constructor<? extends Combiner> ctor;
+ try {
+ ctor = clazz.getConstructor(TezTaskContext.class);
+ combiner = ctor.newInstance(taskContext);
+ } catch (SecurityException e) {
+ throw new IOException(e);
+ } catch (NoSuchMethodException e) {
+ throw new IOException(e);
+ } catch (IllegalArgumentException e) {
+ throw new IOException(e);
+ } catch (InstantiationException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new IOException(e);
+ } catch (InvocationTargetException e) {
+ throw new IOException(e);
+ }
+ return combiner;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Partitioner instantiatePartitioner(Configuration conf)
+ throws IOException {
+ Class<? extends Partitioner> clazz;
+ try {
+ clazz = (Class<? extends Partitioner>) conf
+ .getClassByName(conf.get(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS));
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Unable to find Partitioner class in config", e);
+ }
+
+ LOG.info("Using partitioner class: " + clazz.getName());
+
+ Partitioner partitioner = null;
+
+ try {
+ Constructor<? extends Partitioner> ctorWithConf = clazz
+ .getConstructor(Configuration.class);
+ partitioner = ctorWithConf.newInstance(conf);
+ } catch (SecurityException e) {
+ throw new IOException(e);
+ } catch (NoSuchMethodException e) {
+ try {
+ // Try a 0 argument constructor.
+ partitioner = clazz.newInstance();
+ } catch (InstantiationException e1) {
+ throw new IOException(e1);
+ } catch (IllegalAccessException e1) {
+ throw new IOException(e1);
+ }
+ } catch (IllegalArgumentException e) {
+ throw new IOException(e);
+ } catch (InstantiationException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new IOException(e);
+ } catch (InvocationTargetException e) {
+ throw new IOException(e);
+ }
+ return partitioner;
+ }
+
+ public static TezTaskOutput instantiateTaskOutputManager(Configuration conf, TezOutputContext outputContext) {
+ Class<?> clazz = conf.getClass(Constants.TEZ_RUNTIME_TASK_OUTPUT_MANAGER,
+ TezTaskOutputFiles.class);
+ try {
+ Constructor<?> ctor = clazz.getConstructor(Configuration.class, String.class);
+ ctor.setAccessible(true);
+ TezTaskOutput instance = (TezTaskOutput) ctor.newInstance(conf, outputContext.getUniqueIdentifier());
+ return instance;
+ } catch (Exception e) {
+ throw new TezUncheckedException(
+ "Unable to instantiate configured TezOutputFileManager: "
+ + conf.get(Constants.TEZ_RUNTIME_TASK_OUTPUT_MANAGER,
+ TezTaskOutputFiles.class.getName()), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
new file mode 100644
index 0000000..fef3356
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
@@ -0,0 +1,194 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.common;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Iterates values while keys match in sorted input.
+ *
+ * This class is not thread safe. Accessing methods from multiple threads will
+ * lead to corrupt data.
+ *
+ */
+public class ValuesIterator<KEY,VALUE> {
+ protected TezRawKeyValueIterator in; //input iterator
+ private KEY key; // current key
+ private KEY nextKey;
+ private VALUE value; // current value
+ //private boolean hasNext; // more w/ this key
+ private boolean more; // more in file
+ private RawComparator<KEY> comparator;
+ private Deserializer<KEY> keyDeserializer;
+ private Deserializer<VALUE> valDeserializer;
+ private DataInputBuffer keyIn = new DataInputBuffer();
+ private DataInputBuffer valueIn = new DataInputBuffer();
+ private TezCounter inputKeyCounter;
+ private TezCounter inputValueCounter;
+
+ private int keyCtr = 0;
+ private boolean hasMoreValues; // For the current key.
+ private boolean isFirstRecord = true;
+
+ public ValuesIterator (TezRawKeyValueIterator in,
+ RawComparator<KEY> comparator,
+ Class<KEY> keyClass,
+ Class<VALUE> valClass, Configuration conf,
+ TezCounter inputKeyCounter,
+ TezCounter inputValueCounter)
+ throws IOException {
+ this.in = in;
+ this.comparator = comparator;
+ this.inputKeyCounter = inputKeyCounter;
+ this.inputValueCounter = inputValueCounter;
+ SerializationFactory serializationFactory = new SerializationFactory(conf);
+ this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+ this.keyDeserializer.open(keyIn);
+ this.valDeserializer = serializationFactory.getDeserializer(valClass);
+ this.valDeserializer.open(this.valueIn);
+ }
+
+ TezRawKeyValueIterator getRawIterator() { return in; }
+
+ /**
+ * Move to the next K-Vs pair
+ * @return true if another pair exists, otherwise false.
+ * @throws IOException
+ */
+ public boolean moveToNext() throws IOException {
+ if (isFirstRecord) {
+ readNextKey();
+ key = nextKey;
+ nextKey = null;
+ hasMoreValues = more;
+ isFirstRecord = false;
+ } else {
+ nextKey();
+ }
+ return more;
+ }
+
+ /** The current key. */
+ public KEY getKey() {
+ return key;
+ }
+
+ // TODO NEWTEZ Maybe add another method which returns an iterator instead of iterable
+
+ public Iterable<VALUE> getValues() {
+ return new Iterable<VALUE>() {
+
+ @Override
+ public Iterator<VALUE> iterator() {
+
+ return new Iterator<VALUE>() {
+
+ private final int keyNumber = keyCtr;
+
+ @Override
+ public boolean hasNext() {
+ return hasMoreValues;
+ }
+
+ @Override
+ public VALUE next() {
+ if (!hasMoreValues) {
+ throw new NoSuchElementException("iterate past last value");
+ }
+ Preconditions
+ .checkState(
+ keyNumber == keyCtr,
+ "Cannot use values iterator on the previous K-V pair after moveToNext has been invoked to move to the next K-V pair");
+
+ try {
+ readNextValue();
+ readNextKey();
+ } catch (IOException ie) {
+ throw new RuntimeException("problem advancing post rec#"+keyCtr, ie);
+ }
+ inputValueCounter.increment(1);
+ return value;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Cannot remove elements");
+ }
+ };
+ }
+ };
+ }
+
+
+
+ /** Start processing next unique key. */
+ private void nextKey() throws IOException {
+ // read until we find a new key
+ while (hasMoreValues) {
+ readNextKey();
+ }
+ if (more) {
+ inputKeyCounter.increment(1);
+ ++keyCtr;
+ }
+
+ // move the next key to the current one
+ KEY tmpKey = key;
+ key = nextKey;
+ nextKey = tmpKey;
+ hasMoreValues = more;
+ }
+
+ /**
+ * read the next key - which may be the same as the current key.
+ */
+ private void readNextKey() throws IOException {
+ more = in.next();
+ if (more) {
+ DataInputBuffer nextKeyBytes = in.getKey();
+ keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength());
+ nextKey = keyDeserializer.deserialize(nextKey);
+ hasMoreValues = key != null && (comparator.compare(key, nextKey) == 0);
+ } else {
+ hasMoreValues = false;
+ }
+ }
+
+ /**
+ * Read the next value
+ * @throws IOException
+ */
+ private void readNextValue() throws IOException {
+ DataInputBuffer nextValueBytes = in.getValue();
+ valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), nextValueBytes.getLength());
+ value = valDeserializer.deserialize(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/YARNMaster.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/YARNMaster.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/YARNMaster.java
new file mode 100644
index 0000000..8709e05
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/YARNMaster.java
@@ -0,0 +1,57 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.common;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+@Private
+@Unstable
+public class YARNMaster {
+
+ public enum State {
+ INITIALIZING, RUNNING;
+ }
+
+ public static String getMasterUserName(Configuration conf) {
+ return conf.get(YarnConfiguration.RM_PRINCIPAL);
+ }
+
+ public static InetSocketAddress getMasterAddress(Configuration conf) {
+ return conf.getSocketAddr(
+ YarnConfiguration.RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_PORT);
+ }
+
+ public static String getMasterPrincipal(Configuration conf)
+ throws IOException {
+ String masterHostname = getMasterAddress(conf).getHostName();
+ // get kerberos principal for use as delegation token renewer
+ return SecurityUtil.getServerPrincipal(
+ getMasterUserName(conf), masterHostname);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/combine/Combiner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/combine/Combiner.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/combine/Combiner.java
new file mode 100644
index 0000000..5b10590
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/combine/Combiner.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.combine;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
+
+/**
+ *<b>Combiner Initialization</b></p> The Combiner class is picked up
+ * using the TEZ_RUNTIME_COMBINER_CLASS attribute in {@link TezJobConfig}
+ *
+ *
+ * Partitioners need to provide a single argument ({@link TezTaskContext})
+ * constructor.
+ */
+@Unstable
+@LimitedPrivate("mapreduce")
+public interface Combiner {
+ public void combine(TezRawKeyValueIterator rawIter, Writer writer)
+ throws InterruptedException, IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java
new file mode 100644
index 0000000..b40df6f
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java
@@ -0,0 +1,120 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.common.localshuffle;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.runtime.library.common.task.local.output.TezLocalTaskOutputFiles;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
+
+@SuppressWarnings({"rawtypes"})
+public class LocalShuffle {
+
+ // TODO NEWTEZ This is broken.
+
+ private final TezInputContext inputContext;
+ private final Configuration conf;
+ private final int numInputs;
+
+ private final Class keyClass;
+ private final Class valClass;
+ private final RawComparator comparator;
+
+ private final FileSystem rfs;
+ private final int sortFactor;
+
+ private final TezCounter spilledRecordsCounter;
+ private final CompressionCodec codec;
+ private final TezTaskOutput mapOutputFile;
+
+ public LocalShuffle(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
+ this.inputContext = inputContext;
+ this.conf = conf;
+ this.numInputs = numInputs;
+
+ this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+ this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
+ this.comparator = ConfigUtils.getIntermediateInputKeyComparator(conf);
+
+ this.sortFactor =
+ conf.getInt(
+ TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_IO_SORT_FACTOR);
+
+ this.rfs = FileSystem.getLocal(conf).getRaw();
+
+ this.spilledRecordsCounter = inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
+
+ // compression
+ if (ConfigUtils.isIntermediateInputCompressed(conf)) {
+ Class<? extends CompressionCodec> codecClass =
+ ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
+ this.codec = ReflectionUtils.newInstance(codecClass, conf);
+ } else {
+ this.codec = null;
+ }
+
+ // Always local
+ this.mapOutputFile = new TezLocalTaskOutputFiles(conf, inputContext.getUniqueIdentifier());
+ }
+
+
+ public TezRawKeyValueIterator run() throws IOException {
+ // Copy is complete, obviously!
+
+
+ // Merge
+ return TezMerger.merge(conf, rfs,
+ keyClass, valClass,
+ codec,
+ getMapFiles(),
+ false,
+ sortFactor,
+ new Path(inputContext.getUniqueIdentifier()), // TODO NEWTEZ This is likely broken
+ comparator,
+ null, spilledRecordsCounter, null, null);
+ }
+
+ private Path[] getMapFiles()
+ throws IOException {
+ List<Path> fileList = new ArrayList<Path>();
+ // for local jobs
+ for(int i = 0; i < numInputs; ++i) {
+ //fileList.add(mapOutputFile.getInputFile(i));
+ }
+
+ return fileList.toArray(new Path[0]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenIdentifier.java
new file mode 100644
index 0000000..4b916fa
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenIdentifier.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * The token identifier for job token
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class JobTokenIdentifier extends TokenIdentifier {
+ private Text jobid;
+ public final static Text KIND_NAME = new Text("mapreduce.job");
+
+ /**
+ * Default constructor
+ */
+ public JobTokenIdentifier() {
+ this.jobid = new Text();
+ }
+
+ /**
+ * Create a job token identifier from a jobid
+ * @param jobid the jobid to use
+ */
+ public JobTokenIdentifier(Text jobid) {
+ this.jobid = jobid;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Text getKind() {
+ return KIND_NAME;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public UserGroupInformation getUser() {
+ if (jobid == null || "".equals(jobid.toString())) {
+ return null;
+ }
+ return UserGroupInformation.createRemoteUser(jobid.toString());
+ }
+
+ /**
+ * Get the jobid
+ * @return the jobid
+ */
+ public Text getJobId() {
+ return jobid;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ jobid.readFields(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void write(DataOutput out) throws IOException {
+ jobid.write(out);
+ }
+
+ @InterfaceAudience.Private
+ public static class Renewer extends Token.TrivialRenewer {
+ @Override
+ protected Text getKind() {
+ return KIND_NAME;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenSecretManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenSecretManager.java
new file mode 100644
index 0000000..a03ee94
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenSecretManager.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.security;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * SecretManager for job token. It can be used to cache generated job tokens.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class JobTokenSecretManager extends SecretManager<JobTokenIdentifier> {
+ private final SecretKey masterKey;
+ private final Map<String, SecretKey> currentJobTokens;
+
+ /**
+ * Convert the byte[] to a secret key
+ * @param key the byte[] to create the secret key from
+ * @return the secret key
+ */
+ public static SecretKey createSecretKey(byte[] key) {
+ return SecretManager.createSecretKey(key);
+ }
+
+ /**
+ * Compute the HMAC hash of the message using the key
+ * @param msg the message to hash
+ * @param key the key to use
+ * @return the computed hash
+ */
+ public static byte[] computeHash(byte[] msg, SecretKey key) {
+ return createPassword(msg, key);
+ }
+
+ /**
+ * Default constructor
+ */
+ public JobTokenSecretManager() {
+ this.masterKey = generateSecret();
+ this.currentJobTokens = new TreeMap<String, SecretKey>();
+ }
+
+ /**
+ * Create a new password/secret for the given job token identifier.
+ * @param identifier the job token identifier
+ * @return token password/secret
+ */
+ @Override
+ public byte[] createPassword(JobTokenIdentifier identifier) {
+ byte[] result = createPassword(identifier.getBytes(), masterKey);
+ return result;
+ }
+
+ /**
+ * Add the job token of a job to cache
+ * @param jobId the job that owns the token
+ * @param token the job token
+ */
+ public void addTokenForJob(String jobId, Token<JobTokenIdentifier> token) {
+ SecretKey tokenSecret = createSecretKey(token.getPassword());
+ synchronized (currentJobTokens) {
+ currentJobTokens.put(jobId, tokenSecret);
+ }
+ }
+
+ /**
+ * Remove the cached job token of a job from cache
+ * @param jobId the job whose token is to be removed
+ */
+ public void removeTokenForJob(String jobId) {
+ synchronized (currentJobTokens) {
+ currentJobTokens.remove(jobId);
+ }
+ }
+
+ /**
+ * Look up the token password/secret for the given jobId.
+ * @param jobId the jobId to look up
+ * @return token password/secret as SecretKey
+ * @throws InvalidToken
+ */
+ public SecretKey retrieveTokenSecret(String jobId) throws InvalidToken {
+ SecretKey tokenSecret = null;
+ synchronized (currentJobTokens) {
+ tokenSecret = currentJobTokens.get(jobId);
+ }
+ if (tokenSecret == null) {
+ throw new InvalidToken("Can't find job token for job " + jobId + " !!");
+ }
+ return tokenSecret;
+ }
+
+ /**
+ * Look up the token password/secret for the given job token identifier.
+ * @param identifier the job token identifier to look up
+ * @return token password/secret as byte[]
+ * @throws InvalidToken
+ */
+ @Override
+ public byte[] retrievePassword(JobTokenIdentifier identifier)
+ throws InvalidToken {
+ return retrieveTokenSecret(identifier.getJobId().toString()).getEncoded();
+ }
+
+ /**
+ * Create an empty job token identifier
+ * @return a newly created empty job token identifier
+ */
+ @Override
+ public JobTokenIdentifier createIdentifier() {
+ return new JobTokenIdentifier();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenSelector.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenSelector.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenSelector.java
new file mode 100644
index 0000000..b8227ab
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenSelector.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.security;
+
+import java.util.Collection;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+
+/**
+ * Look through tokens to find the first job token that matches the service
+ * and return it.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class JobTokenSelector implements TokenSelector<JobTokenIdentifier> {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Token<JobTokenIdentifier> selectToken(Text service,
+ Collection<Token<? extends TokenIdentifier>> tokens) {
+ if (service == null) {
+ return null;
+ }
+ for (Token<? extends TokenIdentifier> token : tokens) {
+ if (JobTokenIdentifier.KIND_NAME.equals(token.getKind())
+ && service.equals(token.getService())) {
+ return (Token<JobTokenIdentifier>) token;
+ }
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/Master.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/Master.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/Master.java
new file mode 100644
index 0000000..5b91e0f
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/Master.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.security;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+@Private
+@Unstable
+public class Master {
+
+ public enum State {
+ INITIALIZING, RUNNING;
+ }
+
+ public static String getMasterUserName(Configuration conf) {
+ return conf.get(YarnConfiguration.RM_PRINCIPAL);
+ }
+
+ public static InetSocketAddress getMasterAddress(Configuration conf) {
+ return conf
+ .getSocketAddr(YarnConfiguration.RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_PORT);
+ }
+
+ public static String getMasterPrincipal(Configuration conf)
+ throws IOException {
+ String masterHostname = getMasterAddress(conf).getHostName();
+ // get kerberos principal for use as delegation token renewer
+ return SecurityUtil.getServerPrincipal(getMasterUserName(conf),
+ masterHostname);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
new file mode 100644
index 0000000..b2d382c
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.security;
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URL;
+
+import javax.crypto.SecretKey;
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.WritableComparator;
+
+/**
+ *
+ * utilities for generating kyes, hashes and verifying them for shuffle
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SecureShuffleUtils {
+ public static final String HTTP_HEADER_URL_HASH = "UrlHash";
+ public static final String HTTP_HEADER_REPLY_URL_HASH = "ReplyHash";
+
+ /**
+ * Base64 encoded hash of msg
+ * @param msg
+ */
+ public static String generateHash(byte[] msg, SecretKey key) {
+ return new String(Base64.encodeBase64(generateByteHash(msg, key)));
+ }
+
+ /**
+ * calculate hash of msg
+ * @param msg
+ * @return
+ */
+ private static byte[] generateByteHash(byte[] msg, SecretKey key) {
+ return JobTokenSecretManager.computeHash(msg, key);
+ }
+
+ /**
+ * verify that hash equals to HMacHash(msg)
+ * @param newHash
+ * @return true if is the same
+ */
+ private static boolean verifyHash(byte[] hash, byte[] msg, SecretKey key) {
+ byte[] msg_hash = generateByteHash(msg, key);
+ return WritableComparator.compareBytes(msg_hash, 0, msg_hash.length, hash, 0, hash.length) == 0;
+ }
+
+ /**
+ * Aux util to calculate hash of a String
+ * @param enc_str
+ * @param key
+ * @return Base64 encodedHash
+ * @throws IOException
+ */
+ public static String hashFromString(String enc_str, SecretKey key)
+ throws IOException {
+ return generateHash(enc_str.getBytes(), key);
+ }
+
+ /**
+ * verify that base64Hash is same as HMacHash(msg)
+ * @param base64Hash (Base64 encoded hash)
+ * @param msg
+ * @throws IOException if not the same
+ */
+ public static void verifyReply(String base64Hash, String msg, SecretKey key)
+ throws IOException {
+ byte[] hash = Base64.decodeBase64(base64Hash.getBytes());
+
+ boolean res = verifyHash(hash, msg.getBytes(), key);
+
+ if(res != true) {
+ throw new IOException("Verification of the hashReply failed");
+ }
+ }
+
+ /**
+ * Shuffle specific utils - build string for encoding from URL
+ * @param url
+ * @return string for encoding
+ */
+ public static String buildMsgFrom(URL url) {
+ return buildMsgFrom(url.getPath(), url.getQuery(), url.getPort());
+ }
+ /**
+ * Shuffle specific utils - build string for encoding from URL
+ * @param request
+ * @return string for encoding
+ */
+ public static String buildMsgFrom(HttpServletRequest request ) {
+ return buildMsgFrom(request.getRequestURI(), request.getQueryString(),
+ request.getLocalPort());
+ }
+ /**
+ * Shuffle specific utils - build string for encoding from URL
+ * @param uri_path
+ * @param uri_query
+ * @return string for encoding
+ */
+ private static String buildMsgFrom(String uri_path, String uri_query, int port) {
+ return String.valueOf(port) + uri_path + "?" + uri_query;
+ }
+
+
+ /**
+ * byte array to Hex String
+ * @param ba
+ * @return string with HEX value of the key
+ */
+ public static String toHex(byte[] ba) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ for(byte b: ba) {
+ ps.printf("%x", b);
+ }
+ return baos.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/TokenCache.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/TokenCache.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/TokenCache.java
new file mode 100644
index 0000000..33373b7
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/TokenCache.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.security;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.tez.common.TezJobConfig;
+
+
+/**
+ * This class provides user facing APIs for transferring secrets from
+ * the job client to the tasks.
+ * The secrets can be stored just before submission of jobs and read during
+ * the task execution.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class TokenCache {
+
+ private static final Log LOG = LogFactory.getLog(TokenCache.class);
+
+
+ /**
+ * auxiliary method to get user's secret keys..
+ * @param alias
+ * @return secret key from the storage
+ */
+ public static byte[] getSecretKey(Credentials credentials, Text alias) {
+ if(credentials == null)
+ return null;
+ return credentials.getSecretKey(alias);
+ }
+
+ /**
+ * Convenience method to obtain delegation tokens from namenodes
+ * corresponding to the paths passed.
+ * @param credentials
+ * @param ps array of paths
+ * @param conf configuration
+ * @throws IOException
+ */
+ public static void obtainTokensForNamenodes(Credentials credentials,
+ Path[] ps, Configuration conf) throws IOException {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return;
+ }
+ obtainTokensForNamenodesInternal(credentials, ps, conf);
+ }
+
+ /**
+ * Remove jobtoken referrals which don't make sense in the context
+ * of the task execution.
+ *
+ * @param conf
+ */
+ public static void cleanUpTokenReferral(Configuration conf) {
+ conf.unset(TezJobConfig.DAG_CREDENTIALS_BINARY);
+ }
+
+ static void obtainTokensForNamenodesInternal(Credentials credentials,
+ Path[] ps, Configuration conf) throws IOException {
+ Set<FileSystem> fsSet = new HashSet<FileSystem>();
+ for(Path p: ps) {
+ fsSet.add(p.getFileSystem(conf));
+ }
+ for (FileSystem fs : fsSet) {
+ obtainTokensForNamenodesInternal(fs, credentials, conf);
+ }
+ }
+
+ /**
+ * get delegation token for a specific FS
+ * @param fs
+ * @param credentials
+ * @param p
+ * @param conf
+ * @throws IOException
+ */
+ static void obtainTokensForNamenodesInternal(FileSystem fs,
+ Credentials credentials, Configuration conf) throws IOException {
+ String delegTokenRenewer = Master.getMasterPrincipal(conf);
+ if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+ throw new IOException(
+ "Can't get Master Kerberos principal for use as renewer");
+ }
+ mergeBinaryTokens(credentials, conf);
+
+ final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer,
+ credentials);
+ if (tokens != null) {
+ for (Token<?> token : tokens) {
+ LOG.info("Got dt for " + fs.getUri() + "; "+token);
+ }
+ }
+ }
+
+ private static void mergeBinaryTokens(Credentials creds, Configuration conf) {
+ String binaryTokenFilename =
+ conf.get(TezJobConfig.DAG_CREDENTIALS_BINARY);
+ if (binaryTokenFilename != null) {
+ Credentials binary;
+ try {
+ binary = Credentials.readTokenStorageFile(
+ new Path("file:///" + binaryTokenFilename), conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ // supplement existing tokens with the tokens in the binary file
+ creds.mergeAll(binary);
+ }
+ }
+
+ /**
+ * file name used on HDFS for generated job token
+ */
+ @InterfaceAudience.Private
+ public static final String JOB_TOKEN_HDFS_FILE = "jobToken";
+
+ /**
+ * conf setting for job tokens cache file name
+ */
+ @InterfaceAudience.Private
+ public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
+ private static final Text JOB_TOKEN = new Text("JobToken");
+ private static final Text SHUFFLE_TOKEN = new Text("MapReduceShuffleToken");
+
+ /**
+ * load job token from a file
+ * @param conf
+ * @throws IOException
+ */
+ @InterfaceAudience.Private
+ public static Credentials loadTokens(String jobTokenFile, Configuration conf)
+ throws IOException {
+ Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
+
+ Credentials ts = Credentials.readTokenStorageFile(localJobTokenFile, conf);
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Task: Loaded jobTokenFile from: "+
+ localJobTokenFile.toUri().getPath()
+ +"; num of sec keys = " + ts.numberOfSecretKeys() +
+ " Number of tokens " + ts.numberOfTokens());
+ }
+ return ts;
+ }
+ /**
+ * store job token
+ * @param t
+ */
+ @InterfaceAudience.Private
+ public static void setJobToken(Token<? extends TokenIdentifier> t,
+ Credentials credentials) {
+ credentials.addToken(JOB_TOKEN, t);
+ }
+ /**
+ *
+ * @return job token
+ */
+ @SuppressWarnings("unchecked")
+ @InterfaceAudience.Private
+ public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
+ return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
+ }
+
+ @InterfaceAudience.Private
+ public static void setShuffleSecretKey(byte[] key, Credentials credentials) {
+ credentials.addSecretKey(SHUFFLE_TOKEN, key);
+ }
+
+ @InterfaceAudience.Private
+ public static byte[] getShuffleSecretKey(Credentials credentials) {
+ return getSecretKey(credentials, SHUFFLE_TOKEN);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ExceptionReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ExceptionReporter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ExceptionReporter.java
new file mode 100644
index 0000000..a872ba1
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ExceptionReporter.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.shuffle.impl;
+
+/**
+ * An interface for reporting exceptions to other threads
+ */
+interface ExceptionReporter {
+ void reportException(Throwable t);
+}