You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/11/04 22:14:53 UTC

git commit: TEZ-248. 0 number of tasks causes Arithmetic exception. (hitesh)

Updated Branches:
  refs/heads/master ae06ee895 -> a153a7762


TEZ-248. 0 number of tasks causes Arithmetic exception. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/a153a776
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/a153a776
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/a153a776

Branch: refs/heads/master
Commit: a153a7762d70c01d644047894a843a996f25c74b
Parents: ae06ee8
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon Nov 4 13:14:32 2013 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Mon Nov 4 13:14:32 2013 -0800

----------------------------------------------------------------------
 .../java/org/apache/tez/dag/api/Vertex.java     |  5 +--
 .../org/apache/tez/dag/api/TestDAGVerify.java   | 12 +++++--
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 31 +++++++++++++---
 .../tez/mapreduce/examples/ExampleDriver.java   | 17 ++++++---
 .../input/BroadcastShuffleManager.java          |  2 +-
 .../runtime/library/input/LocalMergedInput.java |  9 ++++-
 .../library/input/ShuffledMergedInput.java      | 35 +++++++++++++++++-
 .../input/ShuffledMergedInputLegacy.java        | 33 +++++++++++++++++
 .../library/input/ShuffledUnorderedKVInput.java | 38 ++++++++++++++++----
 9 files changed, 159 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a153a776/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 673430d..065d98b 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -58,9 +58,10 @@ public class Vertex { // FIXME rename to Task
     this.processorDescriptor = processorDescriptor;
     this.parallelism = parallelism;
     this.taskResource = taskResource;
-    if (!(parallelism == -1 || parallelism > 0)) {
+    if (parallelism < -1) {
       throw new IllegalArgumentException(
-          "Parallelism should be -1 if determined by the AM, otherwise should be > 0");
+          "Parallelism should be -1 if determined by the AM"
+          + ", otherwise should be >= 0");
     }
     if (taskResource == null) {
       throw new IllegalArgumentException("Resource cannot be null");

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a153a776/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index 14c510a..81718bc 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -421,17 +421,25 @@ public class TestDAGVerify {
   @SuppressWarnings("unused")
   @Test
   public void testInvalidVertexConstruction() {
+    {
+      Vertex v1 = new Vertex("v1",
+        new ProcessorDescriptor("MapProcessor"),
+        0, dummyTaskResource);
+      Vertex v2 = new Vertex("v1",
+        new ProcessorDescriptor("MapProcessor"),
+        -1, dummyTaskResource);
+    }
     try {
       Vertex v1 = new Vertex("v1",
           new ProcessorDescriptor("MapProcessor"),
-          0, dummyTaskResource);
+          -2, dummyTaskResource);
       Assert.fail("Expected exception for 0 parallelism");
     } catch (IllegalArgumentException e) {
       Assert
           .assertTrue(e
               .getMessage()
               .startsWith(
-                  "Parallelism should be -1 if determined by the AM, otherwise should be > 0"));
+                  "Parallelism should be -1 if determined by the AM, otherwise should be >= 0"));
     }
     try {
       Vertex v1 = new Vertex("v1",

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a153a776/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index b9e7ae7..6c88467 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -281,6 +281,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           .addTransition(VertexState.RUNNING, VertexState.RUNNING,
               VertexEventType.V_TASK_RESCHEDULED,
               new TaskRescheduledTransition())
+          .addTransition(VertexState.RUNNING,
+              EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED,
+                  VertexState.FAILED),
+              VertexEventType.V_COMPLETED,
+              new VertexNoTasksCompletedTransition())
           .addTransition(
               VertexState.RUNNING,
               VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR,
@@ -1058,10 +1063,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   /**
    * Set the terminationCause and send a kill-message to all tasks.
    * The task-kill messages are only sent once.
-   * @param the trigger that is causing the Vertex to transition to KILLED/FAILED
-   * @param event The type of kill event to send to the vertices.
    */
-  void enactKill(VertexTerminationCause trigger, TaskTerminationCause taskterminationCause) {
+  void enactKill(VertexTerminationCause trigger,
+      TaskTerminationCause taskterminationCause) {
     if(trySetTerminationCause(trigger)){
       for (Task task : tasks.values()) {
         eventHandler.handle(
@@ -1131,6 +1135,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     // TODO: Metrics
     //vertex.metrics.endPreparingJob(job);
     initedTime = clock.getTime();
+
     return VertexState.INITED;
   }
   
@@ -1255,7 +1260,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       
       // Create tasks based on initial configuration, but don't start them yet.
       if (vertex.numTasks == -1) {
-        LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers to set #tasks for the vertex");
+        LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers"
+            + " to set #tasks for the vertex " + vertex.getVertexId());
       } else {
         vertex.createTasks();
       }
@@ -1405,6 +1411,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       eventHandler.handle(new VertexEventSourceVertexStarted(targetVertex
           .getVertexId(), distanceFromRoot));
     }
+
+    // If we have no tasks, just transition to vertex completed
+    if (this.numTasks == 0) {
+      eventHandler.handle(new VertexEvent(
+        this.vertexId, VertexEventType.V_COMPLETED));
+    }
   }
 
   private void abortVertex(VertexStatus.State finalState) {
@@ -1626,7 +1638,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       vertex.succeededTaskCount--;
     }
   }
-  
+
+  static class VertexNoTasksCompletedTransition implements
+      MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+
+    @Override
+    public VertexState transition(VertexImpl vertex, VertexEvent event) {
+      return VertexImpl.checkVertexForCompletion(vertex);
+    }
+  }
+
   private static class TaskRescheduledAfterVertexSuccessTransition implements
     MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a153a776/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index cb7abe1..5299431 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -87,24 +87,31 @@ public class ExampleDriver {
 
   public static void printMRRDAGStatus(DAGStatus dagStatus) {
     Progress progress = dagStatus.getDAGProgress();
+    double vProgressFloat = 0.0f;
     if (progress != null) {
       System.out.println("");
       System.out.println("DAG: State: "
           + dagStatus.getState()
           + " Progress: "
-          + formatter.format((double)(progress.getSucceededTaskCount())
-              /progress.getTotalTaskCount()));
+          + (progress.getTotalTaskCount() < 0 ? formatter.format(0.0f) :
+            formatter.format((double)(progress.getSucceededTaskCount())
+              /progress.getTotalTaskCount())));
       final String[] vNames = { "initialmap", "ivertex1", "finalreduce" };
       for (String vertexName : vNames) {
         Progress vProgress = dagStatus.getVertexProgress().get(vertexName);
         if (vProgress != null) {
+          vProgressFloat = 0.0f;
+          if (vProgress.getTotalTaskCount() == 0) {
+            vProgressFloat = 1.0f;
+          } else if (vProgress.getTotalTaskCount() > 0) {
+            vProgressFloat = (double)vProgress.getSucceededTaskCount()
+              /vProgress.getTotalTaskCount();
+          }
           System.out.println("VertexStatus:"
               + " VertexName: "
               + (vertexName.equals("ivertex1") ? "intermediate-reducer"
                   : vertexName)
-              + " Progress: "
-              + formatter.format((double)vProgress.getSucceededTaskCount()
-                      /vProgress.getTotalTaskCount()));
+              + " Progress: " + formatter.format(vProgressFloat));
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a153a776/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index 717f8ab..7246359 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -553,7 +553,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
   /////////////////// End of methods for walking the available inputs
 
   @SuppressWarnings("rawtypes")
-  public BroadcastKVReader craeteReader() throws IOException {
+  public BroadcastKVReader createReader() throws IOException {
     return new BroadcastKVReader(this, conf, codec, ifileReadAhead, ifileReadAheadLength, ifileBufferSize);
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a153a776/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
index 3aec247..683578e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
@@ -38,6 +38,10 @@ public class LocalMergedInput extends ShuffledMergedInputLegacy {
     this.inputContext = inputContext;
     this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
 
+    if (numInputs == 0) {
+      return Collections.emptyList();
+    }
+
     LocalShuffle localShuffle = new LocalShuffle(inputContext, conf, numInputs);
     rawIter = localShuffle.run();
     createValuesIterator();
@@ -46,7 +50,10 @@ public class LocalMergedInput extends ShuffledMergedInputLegacy {
 
   @Override
   public List<Event> close() throws IOException {
-    rawIter.close();
+    if (numInputs != 0) {
+      rawIter.close();
+    }
     return Collections.emptyList();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a153a776/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index da152b8..6a01564 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -69,6 +69,10 @@ public class ShuffledMergedInput implements LogicalInput {
     this.inputContext = inputContext;
     this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
 
+    if (this.numInputs == 0) {
+      return Collections.emptyList();
+    }
+
     this.inputKeyCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
     this.inputValueCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS);
     this.conf.setStrings(TezJobConfig.LOCAL_DIRS,
@@ -89,6 +93,9 @@ public class ShuffledMergedInput implements LogicalInput {
    *         still in progress
    */
   public boolean isInputReady() {
+    if (this.numInputs == 0) {
+      return true;
+    }
     return shuffle.isInputReady();
   }
 
@@ -98,13 +105,18 @@ public class ShuffledMergedInput implements LogicalInput {
    * @throws InterruptedException
    */
   public void waitForInputReady() throws IOException, InterruptedException {
+    if (this.numInputs == 0) {
+      return;
+    }
     rawIter = shuffle.waitForInput();
     createValuesIterator();
   }
 
   @Override
   public List<Event> close() throws IOException {
-    rawIter.close();
+    if (this.numInputs != 0) {
+      rawIter.close();
+    }
     return Collections.emptyList();
   }
 
@@ -122,6 +134,24 @@ public class ShuffledMergedInput implements LogicalInput {
    */
   @Override
   public KeyValuesReader getReader() throws IOException {
+    if (this.numInputs == 0) {
+      return new KeyValuesReader() {
+        @Override
+        public boolean next() throws IOException {
+          return false;
+        }
+
+        @Override
+        public Object getCurrentKey() throws IOException {
+          throw new RuntimeException("No data available in Input");
+        }
+
+        @Override
+        public Iterable<Object> getCurrentValues() throws IOException {
+          throw new RuntimeException("No data available in Input");
+        }
+      };
+    }
     if (rawIter == null) {
       try {
         waitForInputReady();
@@ -150,6 +180,9 @@ public class ShuffledMergedInput implements LogicalInput {
 
   @Override
   public void handleEvents(List<Event> inputEvents) {
+    if (numInputs == 0) {
+      throw new RuntimeException("No input events expected as numInputs is 0");
+    }
     shuffle.handleEvents(inputEvents);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a153a776/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
index 83332db..3bcc957 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
@@ -22,14 +22,47 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.Progress;
 import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
 
 @LimitedPrivate("mapreduce")
 public class ShuffledMergedInputLegacy extends ShuffledMergedInput {
 
+  private final Progress progress = new Progress();
+
   @Private
   public TezRawKeyValueIterator getIterator() throws IOException, InterruptedException {
     // wait for input so that iterator is available
+    if (this.numInputs == 0) {
+      return new TezRawKeyValueIterator() {
+        @Override
+        public DataInputBuffer getKey() throws IOException {
+          throw new RuntimeException("No data available in Input");
+        }
+
+        @Override
+        public DataInputBuffer getValue() throws IOException {
+          throw new RuntimeException("No data available in Input");
+        }
+
+        @Override
+        public boolean next() throws IOException {
+          return false;
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public Progress getProgress() {
+          progress.complete();
+          return progress;
+        }
+      };
+    }
+
     waitForInputReady();
     return rawIter;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a153a776/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index c6ba6b1..147ba09 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -45,8 +45,6 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
   @SuppressWarnings("rawtypes")
   private BroadcastKVReader kvReader;
   
-  
-  
   public ShuffledUnorderedKVInput() {
   }
 
@@ -55,26 +53,54 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
     Preconditions.checkArgument(numInputs != -1, "Number of Inputs has not been set");
     this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
     this.conf.setStrings(TezJobConfig.LOCAL_DIRS, inputContext.getWorkDirs());
-    
-    this.shuffleManager = new BroadcastShuffleManager(inputContext, conf, numInputs);
+
+    if (numInputs == 0) {
+      return null;
+    }
+
+    this.shuffleManager = new BroadcastShuffleManager(inputContext, conf,
+        numInputs);
     this.shuffleManager.run();
-    this.kvReader = this.shuffleManager.craeteReader();
+    this.kvReader = this.shuffleManager.createReader();
     return null;
   }
 
   @Override
   public KeyValueReader getReader() throws Exception {
+    if (numInputs == 0) {
+      return new KeyValueReader() {
+        @Override
+        public boolean next() throws IOException {
+          return false;
+        }
+
+        @Override
+        public Object getCurrentKey() throws IOException {
+          throw new RuntimeException("No data available in Input");
+        }
+
+        @Override
+        public Object getCurrentValue() throws IOException {
+          throw new RuntimeException("No data available in Input");
+        }
+      };
+    }
     return this.kvReader;
   }
 
   @Override
   public void handleEvents(List<Event> inputEvents) throws IOException {
+    if (numInputs == 0) {
+      throw new RuntimeException("No input events expected as numInputs is 0");
+    }
     shuffleManager.handleEvents(inputEvents);
   }
 
   @Override
   public List<Event> close() throws Exception {
-    this.shuffleManager.shutdown();
+    if (numInputs != 0) {
+      this.shuffleManager.shutdown();
+    }
     return null;
   }