You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/08/05 09:16:05 UTC

tez git commit: TEZ-2684. ShuffleVertexManager.parsePartitionStats throws IllegalStateException: Stats should be initialized (rbalamohan)

Repository: tez
Updated Branches:
  refs/heads/master 1864e4b23 -> cc1d89cba


TEZ-2684. ShuffleVertexManager.parsePartitionStats throws IllegalStateException: Stats should be initialized (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cc1d89cb
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cc1d89cb
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cc1d89cb

Branch: refs/heads/master
Commit: cc1d89cba984a9c43e6a966b2c822355b6c85c07
Parents: 1864e4b
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Wed Aug 5 12:49:23 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Wed Aug 5 12:49:23 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 74 ++++++++++++++++++++
 .../vertexmanager/ShuffleVertexManager.java     | 11 ++-
 3 files changed, 83 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/cc1d89cb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 26ce333..c1a4e31 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,7 @@ INCOMPATIBLE CHANGES
   TEZ-2647. Add input causality dependency for attempts
 
 ALL CHANGES:
+  TEZ-2684. ShuffleVertexManager.parsePartitionStats throws IllegalStateException: Stats should be initialized.
   TEZ-2172. FetcherOrderedGrouped using List to store InputAttemptIdentifier can lead to some inefficiency during remove() operation.
   TEZ-2613. Fetcher(unordered) using List to store InputAttemptIdentifier can lead to some inefficiency during remove() operation.
   TEZ-2645. Provide standard analyzers for job analysis.

http://git-wip-us.apache.org/repos/asf/tez/blob/cc1d89cb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index c55ea23..3c0dd1e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -47,7 +47,12 @@ import java.util.concurrent.locks.ReentrantLock;
 import com.google.protobuf.ByteString;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.runtime.api.VertexStatistics;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -5575,6 +5580,75 @@ public class TestVertexImpl {
   }
 
   @Test(timeout = 5000)
+  public void testTez2684() throws AMUserCodeException, IOException {
+    setupPreDagCreation();
+    dagPlan = createSamplerDAGPlan2();
+    setupPostDagCreation();
+
+    VertexImpl vA = vertices.get("A");
+    VertexImpl vB = vertices.get("B");
+    VertexImpl vC = vertices.get("C");
+
+    //vA init & start
+    dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(),
+        VertexEventType.V_INIT));
+    dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(),
+        VertexEventType.V_START));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.RUNNING, vA.getState());
+    Assert.assertEquals(VertexState.NEW, vB.getState());
+    Assert.assertEquals(VertexState.NEW, vC.getState());
+
+    //vB init
+    dispatcher.getEventHandler().handle(new VertexEvent(vB.getVertexId(),
+        VertexEventType.V_INIT));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.INITED, vB.getState());
+    Assert.assertEquals(VertexState.INITED, vC.getState());
+
+    //Send VertexManagerEvent
+    long[] sizes = new long[]{(100 * 1000l * 1000l)};
+    Event vmEvent = getVertexManagerEvent(sizes, 1060000000, "C");
+    TezEvent tezEvent = new TezEvent(vmEvent, null);
+    dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vC.getVertexId(),
+        Lists.newArrayList(tezEvent)));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.INITED, vC.getState());
+
+    //vB start
+    dispatcher.getEventHandler().handle(new VertexEvent(vB.getVertexId(), VertexEventType.V_START));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.RUNNING, vC.getState());
+
+  }
+
+  VertexManagerEvent getVertexManagerEvent(long[] sizes, long totalSize, String vertexName)
+      throws IOException {
+    ByteBuffer payload = null;
+    if (sizes != null) {
+      RoaringBitmap partitionStats = ShuffleUtils.getPartitionStatsForPhysicalOutput(sizes);
+      DataOutputBuffer dout = new DataOutputBuffer();
+      partitionStats.serialize(dout);
+      ByteString
+          partitionStatsBytes = TezCommonUtils.compressByteArrayToByteString(dout.getData());
+      payload =
+          ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder()
+              .setOutputSize(totalSize)
+              .setPartitionStats(partitionStatsBytes)
+              .build().toByteString()
+              .asReadOnlyByteBuffer();
+    } else {
+      payload =
+          ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder()
+              .setOutputSize(totalSize)
+              .build().toByteString()
+              .asReadOnlyByteBuffer();
+    }
+    VertexManagerEvent vmEvent = VertexManagerEvent.create(vertexName, payload);
+    return vmEvent;
+  }
+
+  @Test(timeout = 5000)
   public void testExceptionFromVM_Initialize() throws AMUserCodeException {
     useCustomInitializer = true;
     setupPreDagCreation();

http://git-wip-us.apache.org/repos/asf/tez/blob/cc1d89cb/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 0aef8bd..6c3e3f8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -589,8 +589,12 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
 
 
   void updatePendingTasks() {
+    int tasks = getContext().getVertexNumTasks(getContext().getVertexName());
+    if (tasks == pendingTasks.size() || tasks <= 0) {
+      return;
+    }
     pendingTasks.clear();
-    for (int i=0; i<getContext().getVertexNumTasks(getContext().getVertexName()); ++i) {
+    for (int i = 0; i < tasks; ++i) {
       pendingTasks.add(new PendingTaskInfo(i));
     }
     totalTasksToSchedule = pendingTasks.size();
@@ -801,7 +805,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     boolean computedPartitionSizes = false;
     for (PendingTaskInfo taskInfo : pendingTasks) {
       int index = taskInfo.index;
-      if (targetIndexes != null) { //things have been reconfigured.
+      if (targetIndexes != null) { //parallelism has changed.
         Preconditions.checkState(index < targetIndexes.length,
             "index=" + index +", targetIndexes length=" + targetIndexes.length);
         int[] mapping = targetIndexes[index];
@@ -957,7 +961,8 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
         + slowStartMaxSrcCompletionFraction + " auto:" + enableAutoParallelism
         + " desiredTaskIput:" + desiredTaskInputDataSize + " minTasks:"
         + minTaskParallelism);
-    
+
+    updatePendingTasks();
     if (enableAutoParallelism) {
       getContext().vertexReconfigurationPlanned();
     }