You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/12/14 03:54:56 UTC
tez git commit: TEZ-2684. ShuffleVertexManager.parsePartitionStats
throws IllegalStateException: Stats should be initialized. (rbalamohan)
Repository: tez
Updated Branches:
refs/heads/branch-0.7 62e4b6ea1 -> 282f4e13e
TEZ-2684. ShuffleVertexManager.parsePartitionStats throws IllegalStateException: Stats should be initialized. (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/282f4e13
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/282f4e13
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/282f4e13
Branch: refs/heads/branch-0.7
Commit: 282f4e13e7859e1e4e6d3918beb7c4670f13127b
Parents: 62e4b6e
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Mon Dec 14 08:24:44 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Mon Dec 14 08:24:44 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/dag/app/dag/impl/TestVertexImpl.java | 78 ++++++++++++++++++++
.../vertexmanager/ShuffleVertexManager.java | 7 +-
3 files changed, 85 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/282f4e13/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 023be60..f6f028c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES
+ TEZ-2684. ShuffleVertexManager.parsePartitionStats throws IllegalStateException: Stats should be initialized.
TEZ-2496. Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source.
TEZ-2995. Timeline primary filter should only be on callerId and not type.
TEZ-2943. Change shuffle vertex manager to use per vertex data for auto
http://git-wip-us.apache.org/repos/asf/tez/blob/282f4e13/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 7db5bd9..d08d011 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -49,8 +49,13 @@ import java.util.concurrent.locks.ReentrantLock;
import com.google.protobuf.ByteString;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -5763,6 +5768,79 @@ public class TestVertexImpl {
}
@Test(timeout = 5000)
+ public void testTez2684() throws TezException, IOException {
+ setupPreDagCreation();
+ dagPlan = createSamplerDAGPlan2();
+ setupPostDagCreation();
+
+ VertexImpl vA = vertices.get("A");
+ VertexImpl vB = vertices.get("B");
+ VertexImpl vC = vertices.get("C");
+
+ //vA init & start
+ dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(),
+ VertexEventType.V_INIT));
+ dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(),
+ VertexEventType.V_START));
+ dispatcher.await();
+ Assert.assertEquals(VertexState.RUNNING, vA.getState());
+ Assert.assertEquals(VertexState.NEW, vB.getState());
+ Assert.assertEquals(VertexState.NEW, vC.getState());
+
+ //vB init
+ dispatcher.getEventHandler().handle(new VertexEvent(vB.getVertexId(),
+ VertexEventType.V_INIT));
+ dispatcher.await();
+ Assert.assertEquals(VertexState.INITED, vB.getState());
+ Assert.assertEquals(VertexState.INITED, vC.getState());
+
+ //Send VertexManagerEvent
+ long[] sizes = new long[]{(100 * 1000l * 1000l)};
+ Event vmEvent = getVertexManagerEvent(sizes, 1060000000, "C");
+
+ TezTaskAttemptID taId = TezTaskAttemptID.getInstance(
+ TezTaskID.getInstance(vC.getVertexId(), 1), 1);
+ EventMetaData sourceInfo = new EventMetaData(EventProducerConsumerType.INPUT, "C", "C", taId);
+ TezEvent tezEvent = new TezEvent(vmEvent, sourceInfo);
+ dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vC.getVertexId(),
+ Lists.newArrayList(tezEvent)));
+ dispatcher.await();
+ Assert.assertEquals(VertexState.INITED, vC.getState());
+
+ //vB start
+ dispatcher.getEventHandler().handle(new VertexEvent(vB.getVertexId(), VertexEventType.V_START));
+ dispatcher.await();
+ Assert.assertEquals(VertexState.RUNNING, vC.getState());
+
+ }
+
+ VertexManagerEvent getVertexManagerEvent(long[] sizes, long totalSize, String vertexName)
+ throws IOException {
+ ByteBuffer payload = null;
+ if (sizes != null) {
+ RoaringBitmap partitionStats = ShuffleUtils.getPartitionStatsForPhysicalOutput(sizes);
+ DataOutputBuffer dout = new DataOutputBuffer();
+ partitionStats.serialize(dout);
+ ByteString
+ partitionStatsBytes = TezCommonUtils.compressByteArrayToByteString(dout.getData());
+ payload =
+ ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder()
+ .setOutputSize(totalSize)
+ .setPartitionStats(partitionStatsBytes)
+ .build().toByteString()
+ .asReadOnlyByteBuffer();
+ } else {
+ payload =
+ ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder()
+ .setOutputSize(totalSize)
+ .build().toByteString()
+ .asReadOnlyByteBuffer();
+ }
+ VertexManagerEvent vmEvent = VertexManagerEvent.create(vertexName, payload);
+ return vmEvent;
+ }
+
+ @Test(timeout = 5000)
public void testExceptionFromVM_Initialize() throws TezException {
useCustomInitializer = true;
setupPreDagCreation();
http://git-wip-us.apache.org/repos/asf/tez/blob/282f4e13/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 1950df2..c88c7a2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -622,8 +622,12 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
void updatePendingTasks() {
+ int tasks = getContext().getVertexNumTasks(getContext().getVertexName());
+ if (tasks == pendingTasks.size() || tasks <= 0) {
+ return;
+ }
pendingTasks.clear();
- for (int i = 0; i < getContext().getVertexNumTasks(getContext().getVertexName()); ++i) {
+ for (int i = 0; i < tasks; ++i) {
pendingTasks.add(new PendingTaskInfo(i));
}
totalTasksToSchedule = pendingTasks.size();
@@ -1004,6 +1008,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
+ " desiredTaskIput:" + desiredTaskInputDataSize + " minTasks:"
+ minTaskParallelism);
+ updatePendingTasks();
if (enableAutoParallelism) {
getContext().vertexReconfigurationPlanned();
}