You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/11/04 22:14:53 UTC
git commit: TEZ-248. 0 number of tasks causes Arithmetic exception.
(hitesh)
Updated Branches:
refs/heads/master ae06ee895 -> a153a7762
TEZ-248. 0 number of tasks causes Arithmetic exception. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/a153a776
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/a153a776
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/a153a776
Branch: refs/heads/master
Commit: a153a7762d70c01d644047894a843a996f25c74b
Parents: ae06ee8
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon Nov 4 13:14:32 2013 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Mon Nov 4 13:14:32 2013 -0800
----------------------------------------------------------------------
.../java/org/apache/tez/dag/api/Vertex.java | 5 +--
.../org/apache/tez/dag/api/TestDAGVerify.java | 12 +++++--
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 31 +++++++++++++---
.../tez/mapreduce/examples/ExampleDriver.java | 17 ++++++---
.../input/BroadcastShuffleManager.java | 2 +-
.../runtime/library/input/LocalMergedInput.java | 9 ++++-
.../library/input/ShuffledMergedInput.java | 35 +++++++++++++++++-
.../input/ShuffledMergedInputLegacy.java | 33 +++++++++++++++++
.../library/input/ShuffledUnorderedKVInput.java | 38 ++++++++++++++++----
9 files changed, 159 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a153a776/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 673430d..065d98b 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -58,9 +58,10 @@ public class Vertex { // FIXME rename to Task
this.processorDescriptor = processorDescriptor;
this.parallelism = parallelism;
this.taskResource = taskResource;
- if (!(parallelism == -1 || parallelism > 0)) {
+ if (parallelism < -1) {
throw new IllegalArgumentException(
- "Parallelism should be -1 if determined by the AM, otherwise should be > 0");
+ "Parallelism should be -1 if determined by the AM"
+ + ", otherwise should be >= 0");
}
if (taskResource == null) {
throw new IllegalArgumentException("Resource cannot be null");
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a153a776/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index 14c510a..81718bc 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -421,17 +421,25 @@ public class TestDAGVerify {
@SuppressWarnings("unused")
@Test
public void testInvalidVertexConstruction() {
+ {
+ Vertex v1 = new Vertex("v1",
+ new ProcessorDescriptor("MapProcessor"),
+ 0, dummyTaskResource);
+ Vertex v2 = new Vertex("v1",
+ new ProcessorDescriptor("MapProcessor"),
+ -1, dummyTaskResource);
+ }
try {
Vertex v1 = new Vertex("v1",
new ProcessorDescriptor("MapProcessor"),
- 0, dummyTaskResource);
+ -2, dummyTaskResource);
Assert.fail("Expected exception for 0 parallelism");
} catch (IllegalArgumentException e) {
Assert
.assertTrue(e
.getMessage()
.startsWith(
- "Parallelism should be -1 if determined by the AM, otherwise should be > 0"));
+ "Parallelism should be -1 if determined by the AM, otherwise should be >= 0"));
}
try {
Vertex v1 = new Vertex("v1",
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a153a776/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index b9e7ae7..6c88467 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -281,6 +281,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
.addTransition(VertexState.RUNNING, VertexState.RUNNING,
VertexEventType.V_TASK_RESCHEDULED,
new TaskRescheduledTransition())
+ .addTransition(VertexState.RUNNING,
+ EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED,
+ VertexState.FAILED),
+ VertexEventType.V_COMPLETED,
+ new VertexNoTasksCompletedTransition())
.addTransition(
VertexState.RUNNING,
VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR,
@@ -1058,10 +1063,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
/**
* Set the terminationCause and send a kill-message to all tasks.
* The task-kill messages are only sent once.
- * @param the trigger that is causing the Vertex to transition to KILLED/FAILED
- * @param event The type of kill event to send to the vertices.
*/
- void enactKill(VertexTerminationCause trigger, TaskTerminationCause taskterminationCause) {
+ void enactKill(VertexTerminationCause trigger,
+ TaskTerminationCause taskterminationCause) {
if(trySetTerminationCause(trigger)){
for (Task task : tasks.values()) {
eventHandler.handle(
@@ -1131,6 +1135,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// TODO: Metrics
//vertex.metrics.endPreparingJob(job);
initedTime = clock.getTime();
+
return VertexState.INITED;
}
@@ -1255,7 +1260,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// Create tasks based on initial configuration, but don't start them yet.
if (vertex.numTasks == -1) {
- LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers to set #tasks for the vertex");
+ LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers"
+ + " to set #tasks for the vertex " + vertex.getVertexId());
} else {
vertex.createTasks();
}
@@ -1405,6 +1411,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
eventHandler.handle(new VertexEventSourceVertexStarted(targetVertex
.getVertexId(), distanceFromRoot));
}
+
+ // If we have no tasks, just transition to vertex completed
+ if (this.numTasks == 0) {
+ eventHandler.handle(new VertexEvent(
+ this.vertexId, VertexEventType.V_COMPLETED));
+ }
}
private void abortVertex(VertexStatus.State finalState) {
@@ -1626,7 +1638,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
vertex.succeededTaskCount--;
}
}
-
+
+ static class VertexNoTasksCompletedTransition implements
+ MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+
+ @Override
+ public VertexState transition(VertexImpl vertex, VertexEvent event) {
+ return VertexImpl.checkVertexForCompletion(vertex);
+ }
+ }
+
private static class TaskRescheduledAfterVertexSuccessTransition implements
MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a153a776/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index cb7abe1..5299431 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -87,24 +87,31 @@ public class ExampleDriver {
public static void printMRRDAGStatus(DAGStatus dagStatus) {
Progress progress = dagStatus.getDAGProgress();
+ double vProgressFloat = 0.0f;
if (progress != null) {
System.out.println("");
System.out.println("DAG: State: "
+ dagStatus.getState()
+ " Progress: "
- + formatter.format((double)(progress.getSucceededTaskCount())
- /progress.getTotalTaskCount()));
+ + (progress.getTotalTaskCount() < 0 ? formatter.format(0.0f) :
+ formatter.format((double)(progress.getSucceededTaskCount())
+ /progress.getTotalTaskCount())));
final String[] vNames = { "initialmap", "ivertex1", "finalreduce" };
for (String vertexName : vNames) {
Progress vProgress = dagStatus.getVertexProgress().get(vertexName);
if (vProgress != null) {
+ vProgressFloat = 0.0f;
+ if (vProgress.getTotalTaskCount() == 0) {
+ vProgressFloat = 1.0f;
+ } else if (vProgress.getTotalTaskCount() > 0) {
+ vProgressFloat = (double)vProgress.getSucceededTaskCount()
+ /vProgress.getTotalTaskCount();
+ }
System.out.println("VertexStatus:"
+ " VertexName: "
+ (vertexName.equals("ivertex1") ? "intermediate-reducer"
: vertexName)
- + " Progress: "
- + formatter.format((double)vProgress.getSucceededTaskCount()
- /vProgress.getTotalTaskCount()));
+ + " Progress: " + formatter.format(vProgressFloat));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a153a776/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index 717f8ab..7246359 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -553,7 +553,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
/////////////////// End of methods for walking the available inputs
@SuppressWarnings("rawtypes")
- public BroadcastKVReader craeteReader() throws IOException {
+ public BroadcastKVReader createReader() throws IOException {
return new BroadcastKVReader(this, conf, codec, ifileReadAhead, ifileReadAheadLength, ifileBufferSize);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a153a776/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
index 3aec247..683578e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
@@ -38,6 +38,10 @@ public class LocalMergedInput extends ShuffledMergedInputLegacy {
this.inputContext = inputContext;
this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
+ if (numInputs == 0) {
+ return Collections.emptyList();
+ }
+
LocalShuffle localShuffle = new LocalShuffle(inputContext, conf, numInputs);
rawIter = localShuffle.run();
createValuesIterator();
@@ -46,7 +50,10 @@ public class LocalMergedInput extends ShuffledMergedInputLegacy {
@Override
public List<Event> close() throws IOException {
- rawIter.close();
+ if (numInputs != 0) {
+ rawIter.close();
+ }
return Collections.emptyList();
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a153a776/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index da152b8..6a01564 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -69,6 +69,10 @@ public class ShuffledMergedInput implements LogicalInput {
this.inputContext = inputContext;
this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
+ if (this.numInputs == 0) {
+ return Collections.emptyList();
+ }
+
this.inputKeyCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
this.inputValueCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS);
this.conf.setStrings(TezJobConfig.LOCAL_DIRS,
@@ -89,6 +93,9 @@ public class ShuffledMergedInput implements LogicalInput {
* still in progress
*/
public boolean isInputReady() {
+ if (this.numInputs == 0) {
+ return true;
+ }
return shuffle.isInputReady();
}
@@ -98,13 +105,18 @@ public class ShuffledMergedInput implements LogicalInput {
* @throws InterruptedException
*/
public void waitForInputReady() throws IOException, InterruptedException {
+ if (this.numInputs == 0) {
+ return;
+ }
rawIter = shuffle.waitForInput();
createValuesIterator();
}
@Override
public List<Event> close() throws IOException {
- rawIter.close();
+ if (this.numInputs != 0) {
+ rawIter.close();
+ }
return Collections.emptyList();
}
@@ -122,6 +134,24 @@ public class ShuffledMergedInput implements LogicalInput {
*/
@Override
public KeyValuesReader getReader() throws IOException {
+ if (this.numInputs == 0) {
+ return new KeyValuesReader() {
+ @Override
+ public boolean next() throws IOException {
+ return false;
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException {
+ throw new RuntimeException("No data available in Input");
+ }
+
+ @Override
+ public Iterable<Object> getCurrentValues() throws IOException {
+ throw new RuntimeException("No data available in Input");
+ }
+ };
+ }
if (rawIter == null) {
try {
waitForInputReady();
@@ -150,6 +180,9 @@ public class ShuffledMergedInput implements LogicalInput {
@Override
public void handleEvents(List<Event> inputEvents) {
+ if (numInputs == 0) {
+ throw new RuntimeException("No input events expected as numInputs is 0");
+ }
shuffle.handleEvents(inputEvents);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a153a776/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
index 83332db..3bcc957 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
@@ -22,14 +22,47 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.Progress;
import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
@LimitedPrivate("mapreduce")
public class ShuffledMergedInputLegacy extends ShuffledMergedInput {
+ private final Progress progress = new Progress();
+
@Private
public TezRawKeyValueIterator getIterator() throws IOException, InterruptedException {
// wait for input so that iterator is available
+ if (this.numInputs == 0) {
+ return new TezRawKeyValueIterator() {
+ @Override
+ public DataInputBuffer getKey() throws IOException {
+ throw new RuntimeException("No data available in Input");
+ }
+
+ @Override
+ public DataInputBuffer getValue() throws IOException {
+ throw new RuntimeException("No data available in Input");
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public Progress getProgress() {
+ progress.complete();
+ return progress;
+ }
+ };
+ }
+
waitForInputReady();
return rawIter;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a153a776/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index c6ba6b1..147ba09 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -45,8 +45,6 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
@SuppressWarnings("rawtypes")
private BroadcastKVReader kvReader;
-
-
public ShuffledUnorderedKVInput() {
}
@@ -55,26 +53,54 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
Preconditions.checkArgument(numInputs != -1, "Number of Inputs has not been set");
this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
this.conf.setStrings(TezJobConfig.LOCAL_DIRS, inputContext.getWorkDirs());
-
- this.shuffleManager = new BroadcastShuffleManager(inputContext, conf, numInputs);
+
+ if (numInputs == 0) {
+ return null;
+ }
+
+ this.shuffleManager = new BroadcastShuffleManager(inputContext, conf,
+ numInputs);
this.shuffleManager.run();
- this.kvReader = this.shuffleManager.craeteReader();
+ this.kvReader = this.shuffleManager.createReader();
return null;
}
@Override
public KeyValueReader getReader() throws Exception {
+ if (numInputs == 0) {
+ return new KeyValueReader() {
+ @Override
+ public boolean next() throws IOException {
+ return false;
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException {
+ throw new RuntimeException("No data available in Input");
+ }
+
+ @Override
+ public Object getCurrentValue() throws IOException {
+ throw new RuntimeException("No data available in Input");
+ }
+ };
+ }
return this.kvReader;
}
@Override
public void handleEvents(List<Event> inputEvents) throws IOException {
+ if (numInputs == 0) {
+ throw new RuntimeException("No input events expected as numInputs is 0");
+ }
shuffleManager.handleEvents(inputEvents);
}
@Override
public List<Event> close() throws Exception {
- this.shuffleManager.shutdown();
+ if (numInputs != 0) {
+ this.shuffleManager.shutdown();
+ }
return null;
}