You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by sr...@apache.org on 2016/02/18 10:58:09 UTC

[15/24] tez git commit: TEZ-3090. MRInput should make dagIdentifier, vertexIdentifier, etc available to the InputFormat jobConf. (Siddharth Seth)

TEZ-3090. MRInput should make dagIdentifier, vertexIdentifier, etc
available to the InputFormat jobConf. (Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/99c85d3f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/99c85d3f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/99c85d3f

Branch: refs/heads/TEZ-2980
Commit: 99c85d3f95ac9bbee9c507c4efdc2757ea5b8542
Parents: f352cfb
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Feb 10 20:37:50 2016 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Feb 10 20:37:50 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../tez/mapreduce/hadoop/MRInputHelpers.java    | 124 +++++++++++++++
 .../org/apache/tez/mapreduce/input/MRInput.java |  16 +-
 .../tez/mapreduce/input/base/MRInputBase.java   |  12 ++
 .../apache/tez/mapreduce/input/TestMRInput.java | 151 +++++++++++++++++++
 .../tez/mapreduce/input/TestMultiMRInput.java   |   2 +
 6 files changed, 305 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/99c85d3f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4d7ae6b..c769843 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-3090. MRInput should make dagIdentifier, vertexIdentifier, etc available to the InputFormat jobConf.
   TEZ-3093. CriticalPathAnalyzer should be accessible via zeppelin.
   TEZ-3089. TaskConcurrencyAnalyzer can return negative task count with very large jobs.
   TEZ-2307. Possible wrong error message when submitting new dag

http://git-wip-us.apache.org/repos/asf/tez/blob/99c85d3f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
index 30e4a8c..325e7b2 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
@@ -35,6 +35,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 
+import org.apache.tez.runtime.api.InputContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -707,4 +708,127 @@ public class MRInputHelpers {
     return UserPayload.create(userPayloadBuilder.build().toByteString().asReadOnlyByteBuffer());
   }
 
+
+  private static String getStringProperty(Configuration conf, String propertyName) {
+    Preconditions.checkNotNull(conf, "Configuration must be provided");
+    String propertyString = conf.get(propertyName);
+    Preconditions.checkNotNull(propertyString,
+        "Property " + propertyName + " not found in provided configuration");
+    return propertyString;
+  }
+
+  private static int getIntProperty(Configuration conf, String propertyName) {
+    return Integer.parseInt(getStringProperty(conf, propertyName));
+  }
+
+  /**
+   * @see {@link InputContext#getDagIdentifier}
+   * @param conf configuration instance
+   * @return dag index
+   */
+  @Public
+  public static int getDagIndex(Configuration conf) {
+    return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_INDEX);
+  }
+
+  /**
+   * * @see {@link InputContext#getTaskVertexIndex}
+   * @param conf configuration instance
+   * @return vertex index
+   */
+  @Public
+  public static int getVertexIndex(Configuration conf) {
+    return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_VERTEX_INDEX);
+  }
+
+  /**
+   * @see {@link InputContext#getTaskIndex}
+   * @param conf configuration instance
+   * @return task index
+   */
+  @Public
+  public static int getTaskIndex(Configuration conf) {
+    return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_TASK_INDEX);
+  }
+
+  /**
+   * @see {@link InputContext#getTaskAttemptNumber}
+   * @param conf configuration instance
+   * @return task attempt index
+   */
+  @Public
+  public static int getTaskAttemptIndex(Configuration conf) {
+    return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_INDEX);
+  }
+
+  /**
+   * @see {@link InputContext#getInputIndex}
+   * @param conf configuration instance
+   * @return input index
+   */
+  @Public
+  public static int getInputIndex(Configuration conf) {
+    return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_INPUT_INDEX);
+  }
+
+  /**
+   * @see {@link InputContext#getDAGName}
+   * @param conf configuration instance
+   * @return dag name
+   */
+  @Public
+  public static String getDagName(Configuration conf) {
+    return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_NAME);
+  }
+
+  /**
+   * @see {@link InputContext#getTaskVertexName}
+   * @param conf configuration instance
+   * @return vertex name
+   */
+  @Public
+  public static String getVertexName(Configuration conf) {
+    return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_VERTEX_NAME);
+  }
+
+  /**
+   * @see {@link InputContext#getSourceVertexName}
+   * @param conf configuration instance
+   * @return source name
+   */
+  @Public
+  public static String getInputName(Configuration conf) {
+    return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_INPUT_NAME);
+  }
+
+  /**
+   * @see {@link InputContext#getApplicationId}
+   * @param conf configuration instance
+   * @return applicationId as a string
+   */
+  @Public
+  public static String getApplicationIdString(Configuration conf) {
+    return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_APPLICATION_ID);
+  }
+
+  /**
+   * @see {@link InputContext#getUniqueIdentifier}
+   * @param conf configuration instance
+   * @return unique identifier for the input
+   */
+  @Public
+  public static String getUniqueIdentifier(Configuration conf) {
+    return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_UNIQUE_IDENTIFIER);
+  }
+
+  /**
+   * @see {@link InputContext#getDAGAttemptNumber}
+   * @param conf configuration instance
+   * @return attempt number
+   */
+  @Public
+  public static int getDagAttemptNumber(Configuration conf) {
+    return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/99c85d3f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index b68d135..4a4ba86 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -82,7 +82,21 @@ import com.google.common.collect.Lists;
  */
 @Public
 public class MRInput extends MRInputBase {
-  
+
+  @Private public static final String TEZ_MAPREDUCE_DAG_INDEX = "tez.mapreduce.dag.index";
+  @Private public static final String TEZ_MAPREDUCE_DAG_NAME = "tez.mapreduce.dag.name";
+  @Private public static final String TEZ_MAPREDUCE_VERTEX_INDEX = "tez.mapreduce.vertex.index";
+  @Private public static final String TEZ_MAPREDUCE_VERTEX_NAME = "tez.mapreduce.vertex.name";
+  @Private public static final String TEZ_MAPREDUCE_TASK_INDEX = "tez.mapreduce.task.index";
+  @Private public static final String TEZ_MAPREDUCE_TASK_ATTEMPT_INDEX = "tez.mapreduce.task.attempt.index";
+  @Private public static final String TEZ_MAPREDUCE_INPUT_INDEX = "tez.mapreduce.input.index";
+  @Private public static final String TEZ_MAPREDUCE_INPUT_NAME = "tez.mapreduce.input.name";
+  @Private public static final String TEZ_MAPREDUCE_APPLICATION_ID = "tez.mapreduce.application.id";
+  @Private public static final String TEZ_MAPREDUCE_UNIQUE_IDENTIFIER = "tez.mapreduce.unique.identifier";
+  @Private public static final String TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER = "tez.mapreduce.dag.attempt.number";
+
+
+
   /**
    * Helper class to configure {@link MRInput}
    *

http://git-wip-us.apache.org/repos/asf/tez/blob/99c85d3f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
index e4aa7e2..230f55e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
@@ -32,6 +32,7 @@ import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
 import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
@@ -96,6 +97,17 @@ public abstract class MRInputBase extends AbstractLogicalInput {
         taskAttemptId.toString());
     jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
         getContext().getDAGAttemptNumber());
+    jobConf.setInt(MRInput.TEZ_MAPREDUCE_DAG_INDEX, getContext().getDagIdentifier());
+    jobConf.setInt(MRInput.TEZ_MAPREDUCE_VERTEX_INDEX, getContext().getTaskVertexIndex());
+    jobConf.setInt(MRInput.TEZ_MAPREDUCE_TASK_INDEX, getContext().getTaskIndex());
+    jobConf.setInt(MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_INDEX, getContext().getTaskAttemptNumber());
+    jobConf.set(MRInput.TEZ_MAPREDUCE_DAG_NAME, getContext().getDAGName());
+    jobConf.set(MRInput.TEZ_MAPREDUCE_VERTEX_NAME, getContext().getTaskVertexName());
+    jobConf.setInt(MRInput.TEZ_MAPREDUCE_INPUT_INDEX, getContext().getInputIndex());
+    jobConf.set(MRInput.TEZ_MAPREDUCE_INPUT_NAME, getContext().getSourceVertexName());
+    jobConf.set(MRInput.TEZ_MAPREDUCE_APPLICATION_ID, getContext().getApplicationId().toString());
+    jobConf.set(MRInput.TEZ_MAPREDUCE_UNIQUE_IDENTIFIER, getContext().getUniqueIdentifier());
+    jobConf.setInt(MRInput.TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER, getContext().getDAGAttemptNumber());
 
     this.inputRecordCounter = getContext().getCounters().findCounter(
         TaskCounter.INPUT_RECORDS_PROCESSED);

http://git-wip-us.apache.org/repos/asf/tez/blob/99c85d3f/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
index 448b90c..b42ef25 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
@@ -14,6 +14,7 @@
 
 package org.apache.tez.mapreduce.input;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -22,17 +23,28 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.junit.Test;
 
 public class TestMRInput {
@@ -47,6 +59,10 @@ public class TestMRInput {
     ApplicationId applicationId = ApplicationId.newInstance(1000, 1);
     doReturn(dsd.getInputDescriptor().getUserPayload()).when(inputContext).getUserPayload();
     doReturn(applicationId).when(inputContext).getApplicationId();
+    doReturn("dagName").when(inputContext).getDAGName();
+    doReturn("vertexName").when(inputContext).getTaskVertexName();
+    doReturn("inputName").when(inputContext).getSourceVertexName();
+    doReturn("uniqueIdentifier").when(inputContext).getUniqueIdentifier();
     doReturn(1).when(inputContext).getTaskIndex();
     doReturn(1).when(inputContext).getTaskAttemptNumber();
     doReturn(new TezCounters()).when(inputContext).getCounters();
@@ -69,4 +85,139 @@ public class TestMRInput {
       assertTrue(e instanceof IllegalStateException);
     }
   }
+
+  private static final String TEST_ATTRIBUTES_DAG_NAME = "dagName";
+  private static final String TEST_ATTRIBUTES_VERTEX_NAME = "vertexName";
+  private static final String TEST_ATTRIBUTES_INPUT_NAME = "inputName";
+  private static final ApplicationId TEST_ATTRIBUTES_APPLICATION_ID = ApplicationId.newInstance(0, 0);
+  private static final String TEST_ATTRIBUTES_UNIQUE_IDENTIFIER = "uniqueId";
+  private static final int TEST_ATTRIBUTES_DAG_INDEX = 1000;
+  private static final int TEST_ATTRIBUTES_VERTEX_INDEX = 2000;
+  private static final int TEST_ATTRIBUTES_TASK_INDEX = 3000;
+  private static final int TEST_ATTRIBUTES_TASK_ATTEMPT_INDEX = 4000;
+  private static final int TEST_ATTRIBUTES_INPUT_INDEX = 5000;
+  private static final int TEST_ATTRIBUTES_DAG_ATTEMPT_NUMBER = 6000;
+
+  @Test(timeout = 5000)
+  public void testAttributesInJobConf() throws Exception {
+    InputContext inputContext = mock(InputContext.class);
+    doReturn(TEST_ATTRIBUTES_DAG_INDEX).when(inputContext).getDagIdentifier();
+    doReturn(TEST_ATTRIBUTES_VERTEX_INDEX).when(inputContext).getTaskVertexIndex();
+    doReturn(TEST_ATTRIBUTES_TASK_INDEX).when(inputContext).getTaskIndex();
+    doReturn(TEST_ATTRIBUTES_TASK_ATTEMPT_INDEX).when(inputContext).getTaskAttemptNumber();
+    doReturn(TEST_ATTRIBUTES_INPUT_INDEX).when(inputContext).getInputIndex();
+    doReturn(TEST_ATTRIBUTES_DAG_ATTEMPT_NUMBER).when(inputContext).getDAGAttemptNumber();
+    doReturn(TEST_ATTRIBUTES_DAG_NAME).when(inputContext).getDAGName();
+    doReturn(TEST_ATTRIBUTES_VERTEX_NAME).when(inputContext).getTaskVertexName();
+    doReturn(TEST_ATTRIBUTES_INPUT_NAME).when(inputContext).getSourceVertexName();
+    doReturn(TEST_ATTRIBUTES_APPLICATION_ID).when(inputContext).getApplicationId();
+    doReturn(TEST_ATTRIBUTES_UNIQUE_IDENTIFIER).when(inputContext).getUniqueIdentifier();
+
+
+    DataSourceDescriptor dsd = MRInput.createConfigBuilder(new Configuration(false),
+        TestInputFormat.class).groupSplits(false).build();
+
+    doReturn(dsd.getInputDescriptor().getUserPayload()).when(inputContext).getUserPayload();
+    doReturn(new TezCounters()).when(inputContext).getCounters();
+
+
+    MRInput mrInput = new MRInput(inputContext, 1);
+    mrInput.initialize();
+
+    MRRuntimeProtos.MRSplitProto splitProto =
+        MRRuntimeProtos.MRSplitProto.newBuilder().setSplitClassName(TestInputSplit.class.getName())
+            .build();
+    InputDataInformationEvent diEvent = InputDataInformationEvent
+        .createWithSerializedPayload(0, splitProto.toByteString().asReadOnlyByteBuffer());
+
+    List<Event> events = new LinkedList<>();
+    events.add(diEvent);
+    mrInput.handleEvents(events);
+    assertTrue(TestInputFormat.invoked.get());
+  }
+
+
+  /**
+   * Test class to verify
+   */
+  static class TestInputFormat implements InputFormat {
+
+    private static final AtomicBoolean invoked = new AtomicBoolean(false);
+
+    @Override
+    public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+      return null;
+    }
+
+    @Override
+    public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws
+        IOException {
+      assertEquals(TEST_ATTRIBUTES_DAG_NAME, MRInputHelpers.getDagName(job));
+      assertEquals(TEST_ATTRIBUTES_VERTEX_NAME, MRInputHelpers.getVertexName(job));
+      assertEquals(TEST_ATTRIBUTES_INPUT_NAME, MRInputHelpers.getInputName(job));
+      assertEquals(TEST_ATTRIBUTES_DAG_INDEX, MRInputHelpers.getDagIndex(job));
+      assertEquals(TEST_ATTRIBUTES_VERTEX_INDEX, MRInputHelpers.getVertexIndex(job));
+      assertEquals(TEST_ATTRIBUTES_APPLICATION_ID.toString(), MRInputHelpers.getApplicationIdString(job));
+      assertEquals(TEST_ATTRIBUTES_UNIQUE_IDENTIFIER, MRInputHelpers.getUniqueIdentifier(job));
+      assertEquals(TEST_ATTRIBUTES_TASK_INDEX, MRInputHelpers.getTaskIndex(job));
+      assertEquals(TEST_ATTRIBUTES_TASK_ATTEMPT_INDEX, MRInputHelpers.getTaskAttemptIndex(job));
+      assertEquals(TEST_ATTRIBUTES_INPUT_INDEX, MRInputHelpers.getInputIndex(job));
+      assertEquals(TEST_ATTRIBUTES_DAG_ATTEMPT_NUMBER, MRInputHelpers.getDagAttemptNumber(job));
+      invoked.set(true);
+      return new RecordReader() {
+        @Override
+        public boolean next(Object key, Object value) throws IOException {
+          return false;
+        }
+
+        @Override
+        public Object createKey() {
+          return null;
+        }
+
+        @Override
+        public Object createValue() {
+          return null;
+        }
+
+        @Override
+        public long getPos() throws IOException {
+          return 0;
+        }
+
+        @Override
+        public void close() throws IOException {
+
+        }
+
+        @Override
+        public float getProgress() throws IOException {
+          return 0;
+        }
+      };
+    }
+  }
+
+  public static class TestInputSplit implements InputSplit {
+
+    @Override
+    public long getLength() throws IOException {
+      return 0;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException {
+      return new String[0];
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/99c85d3f/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
index db5643e..1733bfc 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
@@ -33,6 +33,7 @@ import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
+import java.util.UUID;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -312,6 +313,7 @@ public class TestMultiMRInput {
     doReturn(1).when(inputContext).getTaskAttemptNumber();
     doReturn(1).when(inputContext).getTaskIndex();
     doReturn(1).when(inputContext).getTaskVertexIndex();
+    doReturn(UUID.randomUUID().toString()).when(inputContext).getUniqueIdentifier();
     doReturn("taskVertexName").when(inputContext).getTaskVertexName();
     doReturn(UserPayload.create(ByteBuffer.wrap(payload))).when(inputContext).getUserPayload();
     return inputContext;