You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by sr...@apache.org on 2016/02/18 10:58:09 UTC
[15/24] tez git commit: TEZ-3090. MRInput should make dagIdentifier,
vertexIdentifier, etc available to the InputFormat jobConf. (Siddharth Seth)
TEZ-3090. MRInput should make dagIdentifier, vertexIdentifier, etc
available to the InputFormat jobConf. (Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/99c85d3f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/99c85d3f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/99c85d3f
Branch: refs/heads/TEZ-2980
Commit: 99c85d3f95ac9bbee9c507c4efdc2757ea5b8542
Parents: f352cfb
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Feb 10 20:37:50 2016 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Feb 10 20:37:50 2016 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/mapreduce/hadoop/MRInputHelpers.java | 124 +++++++++++++++
.../org/apache/tez/mapreduce/input/MRInput.java | 16 +-
.../tez/mapreduce/input/base/MRInputBase.java | 12 ++
.../apache/tez/mapreduce/input/TestMRInput.java | 151 +++++++++++++++++++
.../tez/mapreduce/input/TestMultiMRInput.java | 2 +
6 files changed, 305 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/99c85d3f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4d7ae6b..c769843 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.3: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3090. MRInput should make dagIdentifier, vertexIdentifier, etc available to the InputFormat jobConf.
TEZ-3093. CriticalPathAnalyzer should be accessible via zeppelin.
TEZ-3089. TaskConcurrencyAnalyzer can return negative task count with very large jobs.
TEZ-2307. Possible wrong error message when submitting new dag
http://git-wip-us.apache.org/repos/asf/tez/blob/99c85d3f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
index 30e4a8c..325e7b2 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
@@ -35,6 +35,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
+import org.apache.tez.runtime.api.InputContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -707,4 +708,127 @@ public class MRInputHelpers {
return UserPayload.create(userPayloadBuilder.build().toByteString().asReadOnlyByteBuffer());
}
+
+ private static String getStringProperty(Configuration conf, String propertyName) {
+ Preconditions.checkNotNull(conf, "Configuration must be provided");
+ String propertyString = conf.get(propertyName);
+ Preconditions.checkNotNull(propertyString,
+ "Property " + propertyName + " not found in provided configuration");
+ return propertyString;
+ }
+
+ private static int getIntProperty(Configuration conf, String propertyName) {
+ return Integer.parseInt(getStringProperty(conf, propertyName));
+ }
+
+ /**
+ * @see {@link InputContext#getDagIdentifier}
+ * @param conf configuration instance
+ * @return dag index
+ */
+ @Public
+ public static int getDagIndex(Configuration conf) {
+ return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_INDEX);
+ }
+
+ /**
+ * * @see {@link InputContext#getTaskVertexIndex}
+ * @param conf configuration instance
+ * @return vertex index
+ */
+ @Public
+ public static int getVertexIndex(Configuration conf) {
+ return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_VERTEX_INDEX);
+ }
+
+ /**
+ * @see {@link InputContext#getTaskIndex}
+ * @param conf configuration instance
+ * @return task index
+ */
+ @Public
+ public static int getTaskIndex(Configuration conf) {
+ return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_TASK_INDEX);
+ }
+
+ /**
+ * @see {@link InputContext#getTaskAttemptNumber}
+ * @param conf configuration instance
+ * @return task attempt index
+ */
+ @Public
+ public static int getTaskAttemptIndex(Configuration conf) {
+ return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_INDEX);
+ }
+
+ /**
+ * @see {@link InputContext#getInputIndex}
+ * @param conf configuration instance
+ * @return input index
+ */
+ @Public
+ public static int getInputIndex(Configuration conf) {
+ return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_INPUT_INDEX);
+ }
+
+ /**
+ * @see {@link InputContext#getDAGName}
+ * @param conf configuration instance
+ * @return dag name
+ */
+ @Public
+ public static String getDagName(Configuration conf) {
+ return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_NAME);
+ }
+
+ /**
+ * @see {@link InputContext#getTaskVertexName}
+ * @param conf configuration instance
+ * @return vertex name
+ */
+ @Public
+ public static String getVertexName(Configuration conf) {
+ return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_VERTEX_NAME);
+ }
+
+ /**
+ * @see {@link InputContext#getSourceVertexName}
+ * @param conf configuration instance
+ * @return source name
+ */
+ @Public
+ public static String getInputName(Configuration conf) {
+ return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_INPUT_NAME);
+ }
+
+ /**
+ * @see {@link InputContext#getApplicationId}
+ * @param conf configuration instance
+ * @return applicationId as a string
+ */
+ @Public
+ public static String getApplicationIdString(Configuration conf) {
+ return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_APPLICATION_ID);
+ }
+
+ /**
+ * @see {@link InputContext#getUniqueIdentifier}
+ * @param conf configuration instance
+ * @return unique identifier for the input
+ */
+ @Public
+ public static String getUniqueIdentifier(Configuration conf) {
+ return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_UNIQUE_IDENTIFIER);
+ }
+
+ /**
+ * @see {@link InputContext#getDAGAttemptNumber}
+ * @param conf configuration instance
+ * @return attempt number
+ */
+ @Public
+ public static int getDagAttemptNumber(Configuration conf) {
+ return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/99c85d3f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index b68d135..4a4ba86 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -82,7 +82,21 @@ import com.google.common.collect.Lists;
*/
@Public
public class MRInput extends MRInputBase {
-
+
+ @Private public static final String TEZ_MAPREDUCE_DAG_INDEX = "tez.mapreduce.dag.index";
+ @Private public static final String TEZ_MAPREDUCE_DAG_NAME = "tez.mapreduce.dag.name";
+ @Private public static final String TEZ_MAPREDUCE_VERTEX_INDEX = "tez.mapreduce.vertex.index";
+ @Private public static final String TEZ_MAPREDUCE_VERTEX_NAME = "tez.mapreduce.vertex.name";
+ @Private public static final String TEZ_MAPREDUCE_TASK_INDEX = "tez.mapreduce.task.index";
+ @Private public static final String TEZ_MAPREDUCE_TASK_ATTEMPT_INDEX = "tez.mapreduce.task.attempt.index";
+ @Private public static final String TEZ_MAPREDUCE_INPUT_INDEX = "tez.mapreduce.input.index";
+ @Private public static final String TEZ_MAPREDUCE_INPUT_NAME = "tez.mapreduce.input.name";
+ @Private public static final String TEZ_MAPREDUCE_APPLICATION_ID = "tez.mapreduce.application.id";
+ @Private public static final String TEZ_MAPREDUCE_UNIQUE_IDENTIFIER = "tez.mapreduce.unique.identifier";
+ @Private public static final String TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER = "tez.mapreduce.dag.attempt.number";
+
+
+
/**
* Helper class to configure {@link MRInput}
*
http://git-wip-us.apache.org/repos/asf/tez/blob/99c85d3f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
index e4aa7e2..230f55e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
@@ -32,6 +32,7 @@ import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
import org.apache.tez.runtime.api.AbstractLogicalInput;
import org.apache.tez.runtime.api.Event;
@@ -96,6 +97,17 @@ public abstract class MRInputBase extends AbstractLogicalInput {
taskAttemptId.toString());
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
getContext().getDAGAttemptNumber());
+ jobConf.setInt(MRInput.TEZ_MAPREDUCE_DAG_INDEX, getContext().getDagIdentifier());
+ jobConf.setInt(MRInput.TEZ_MAPREDUCE_VERTEX_INDEX, getContext().getTaskVertexIndex());
+ jobConf.setInt(MRInput.TEZ_MAPREDUCE_TASK_INDEX, getContext().getTaskIndex());
+ jobConf.setInt(MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_INDEX, getContext().getTaskAttemptNumber());
+ jobConf.set(MRInput.TEZ_MAPREDUCE_DAG_NAME, getContext().getDAGName());
+ jobConf.set(MRInput.TEZ_MAPREDUCE_VERTEX_NAME, getContext().getTaskVertexName());
+ jobConf.setInt(MRInput.TEZ_MAPREDUCE_INPUT_INDEX, getContext().getInputIndex());
+ jobConf.set(MRInput.TEZ_MAPREDUCE_INPUT_NAME, getContext().getSourceVertexName());
+ jobConf.set(MRInput.TEZ_MAPREDUCE_APPLICATION_ID, getContext().getApplicationId().toString());
+ jobConf.set(MRInput.TEZ_MAPREDUCE_UNIQUE_IDENTIFIER, getContext().getUniqueIdentifier());
+ jobConf.setInt(MRInput.TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER, getContext().getDAGAttemptNumber());
this.inputRecordCounter = getContext().getCounters().findCounter(
TaskCounter.INPUT_RECORDS_PROCESSED);
http://git-wip-us.apache.org/repos/asf/tez/blob/99c85d3f/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
index 448b90c..b42ef25 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
@@ -14,6 +14,7 @@
package org.apache.tez.mapreduce.input;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -22,17 +23,28 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.junit.Test;
public class TestMRInput {
@@ -47,6 +59,10 @@ public class TestMRInput {
ApplicationId applicationId = ApplicationId.newInstance(1000, 1);
doReturn(dsd.getInputDescriptor().getUserPayload()).when(inputContext).getUserPayload();
doReturn(applicationId).when(inputContext).getApplicationId();
+ doReturn("dagName").when(inputContext).getDAGName();
+ doReturn("vertexName").when(inputContext).getTaskVertexName();
+ doReturn("inputName").when(inputContext).getSourceVertexName();
+ doReturn("uniqueIdentifier").when(inputContext).getUniqueIdentifier();
doReturn(1).when(inputContext).getTaskIndex();
doReturn(1).when(inputContext).getTaskAttemptNumber();
doReturn(new TezCounters()).when(inputContext).getCounters();
@@ -69,4 +85,139 @@ public class TestMRInput {
assertTrue(e instanceof IllegalStateException);
}
}
+
+ private static final String TEST_ATTRIBUTES_DAG_NAME = "dagName";
+ private static final String TEST_ATTRIBUTES_VERTEX_NAME = "vertexName";
+ private static final String TEST_ATTRIBUTES_INPUT_NAME = "inputName";
+ private static final ApplicationId TEST_ATTRIBUTES_APPLICATION_ID = ApplicationId.newInstance(0, 0);
+ private static final String TEST_ATTRIBUTES_UNIQUE_IDENTIFIER = "uniqueId";
+ private static final int TEST_ATTRIBUTES_DAG_INDEX = 1000;
+ private static final int TEST_ATTRIBUTES_VERTEX_INDEX = 2000;
+ private static final int TEST_ATTRIBUTES_TASK_INDEX = 3000;
+ private static final int TEST_ATTRIBUTES_TASK_ATTEMPT_INDEX = 4000;
+ private static final int TEST_ATTRIBUTES_INPUT_INDEX = 5000;
+ private static final int TEST_ATTRIBUTES_DAG_ATTEMPT_NUMBER = 6000;
+
+ @Test(timeout = 5000)
+ public void testAttributesInJobConf() throws Exception {
+ InputContext inputContext = mock(InputContext.class);
+ doReturn(TEST_ATTRIBUTES_DAG_INDEX).when(inputContext).getDagIdentifier();
+ doReturn(TEST_ATTRIBUTES_VERTEX_INDEX).when(inputContext).getTaskVertexIndex();
+ doReturn(TEST_ATTRIBUTES_TASK_INDEX).when(inputContext).getTaskIndex();
+ doReturn(TEST_ATTRIBUTES_TASK_ATTEMPT_INDEX).when(inputContext).getTaskAttemptNumber();
+ doReturn(TEST_ATTRIBUTES_INPUT_INDEX).when(inputContext).getInputIndex();
+ doReturn(TEST_ATTRIBUTES_DAG_ATTEMPT_NUMBER).when(inputContext).getDAGAttemptNumber();
+ doReturn(TEST_ATTRIBUTES_DAG_NAME).when(inputContext).getDAGName();
+ doReturn(TEST_ATTRIBUTES_VERTEX_NAME).when(inputContext).getTaskVertexName();
+ doReturn(TEST_ATTRIBUTES_INPUT_NAME).when(inputContext).getSourceVertexName();
+ doReturn(TEST_ATTRIBUTES_APPLICATION_ID).when(inputContext).getApplicationId();
+ doReturn(TEST_ATTRIBUTES_UNIQUE_IDENTIFIER).when(inputContext).getUniqueIdentifier();
+
+
+ DataSourceDescriptor dsd = MRInput.createConfigBuilder(new Configuration(false),
+ TestInputFormat.class).groupSplits(false).build();
+
+ doReturn(dsd.getInputDescriptor().getUserPayload()).when(inputContext).getUserPayload();
+ doReturn(new TezCounters()).when(inputContext).getCounters();
+
+
+ MRInput mrInput = new MRInput(inputContext, 1);
+ mrInput.initialize();
+
+ MRRuntimeProtos.MRSplitProto splitProto =
+ MRRuntimeProtos.MRSplitProto.newBuilder().setSplitClassName(TestInputSplit.class.getName())
+ .build();
+ InputDataInformationEvent diEvent = InputDataInformationEvent
+ .createWithSerializedPayload(0, splitProto.toByteString().asReadOnlyByteBuffer());
+
+ List<Event> events = new LinkedList<>();
+ events.add(diEvent);
+ mrInput.handleEvents(events);
+ assertTrue(TestInputFormat.invoked.get());
+ }
+
+
+ /**
+ * Test class to verify
+ */
+ static class TestInputFormat implements InputFormat {
+
+ private static final AtomicBoolean invoked = new AtomicBoolean(false);
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ return null;
+ }
+
+ @Override
+ public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws
+ IOException {
+ assertEquals(TEST_ATTRIBUTES_DAG_NAME, MRInputHelpers.getDagName(job));
+ assertEquals(TEST_ATTRIBUTES_VERTEX_NAME, MRInputHelpers.getVertexName(job));
+ assertEquals(TEST_ATTRIBUTES_INPUT_NAME, MRInputHelpers.getInputName(job));
+ assertEquals(TEST_ATTRIBUTES_DAG_INDEX, MRInputHelpers.getDagIndex(job));
+ assertEquals(TEST_ATTRIBUTES_VERTEX_INDEX, MRInputHelpers.getVertexIndex(job));
+ assertEquals(TEST_ATTRIBUTES_APPLICATION_ID.toString(), MRInputHelpers.getApplicationIdString(job));
+ assertEquals(TEST_ATTRIBUTES_UNIQUE_IDENTIFIER, MRInputHelpers.getUniqueIdentifier(job));
+ assertEquals(TEST_ATTRIBUTES_TASK_INDEX, MRInputHelpers.getTaskIndex(job));
+ assertEquals(TEST_ATTRIBUTES_TASK_ATTEMPT_INDEX, MRInputHelpers.getTaskAttemptIndex(job));
+ assertEquals(TEST_ATTRIBUTES_INPUT_INDEX, MRInputHelpers.getInputIndex(job));
+ assertEquals(TEST_ATTRIBUTES_DAG_ATTEMPT_NUMBER, MRInputHelpers.getDagAttemptNumber(job));
+ invoked.set(true);
+ return new RecordReader() {
+ @Override
+ public boolean next(Object key, Object value) throws IOException {
+ return false;
+ }
+
+ @Override
+ public Object createKey() {
+ return null;
+ }
+
+ @Override
+ public Object createValue() {
+ return null;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return 0;
+ }
+ };
+ }
+ }
+
+ public static class TestInputSplit implements InputSplit {
+
+ @Override
+ public long getLength() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ return new String[0];
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/99c85d3f/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
index db5643e..1733bfc 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
@@ -33,6 +33,7 @@ import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
+import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -312,6 +313,7 @@ public class TestMultiMRInput {
doReturn(1).when(inputContext).getTaskAttemptNumber();
doReturn(1).when(inputContext).getTaskIndex();
doReturn(1).when(inputContext).getTaskVertexIndex();
+ doReturn(UUID.randomUUID().toString()).when(inputContext).getUniqueIdentifier();
doReturn("taskVertexName").when(inputContext).getTaskVertexName();
doReturn(UserPayload.create(ByteBuffer.wrap(payload))).when(inputContext).getUserPayload();
return inputContext;