You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/11/17 20:22:13 UTC

[04/50] [abbrv] tez git commit: TEZ-1699. Vertex.setParallelism should throw an exception for invalid invocations (bikas)

TEZ-1699. Vertex.setParallelism should throw an exception for invalid invocations (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/06e9f88e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/06e9f88e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/06e9f88e

Branch: refs/heads/TEZ-8
Commit: 06e9f88eb207f00ae2f31f46477001726754115c
Parents: 4e69bed
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Oct 29 18:12:41 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Oct 29 18:12:41 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  7 +++
 .../tez/dag/api/VertexManagerPluginContext.java |  3 +-
 .../java/org/apache/tez/dag/app/dag/Vertex.java |  2 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 36 ++++++-------
 .../tez/dag/app/dag/impl/VertexManager.java     |  4 +-
 .../tez/dag/app/dag/impl/TestDAGImpl.java       | 12 ++---
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 56 +++++++++++++++-----
 7 files changed, 78 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/06e9f88e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 933a445..7bc96ce 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -14,6 +14,8 @@ INCOMPATIBLE CHANGES
   TEZ-1666. UserPayload should be null if the payload is not specified.
     0.5.1 client cannot talk to 0.5.2 AMs (TEZ-1666 and TEZ-1664).
     context.getUserPayload can now return null, apps may need to add defensive code.
+  TEZ-1699. Vertex.setParallelism should throw an exception for invalid
+  invocations
 
 ALL CHANGES:
   TEZ-1620. Wait for application finish before stopping MiniTezCluster
@@ -78,6 +80,11 @@ ALL CHANGES:
   TEZ-1701. ATS fixes to flush all history events and also using batching.
   TEZ-792. Default staging path should have user name.
   TEZ-1689. addendum - fix unit test failure.
+  TEZ-1666. UserPayload should be null if the payload is not specified.
+    0.5.1 client cannot talk to 0.5.2 AMs (TEZ-1666 and TEZ-1664).
+    context.getUserPayload can now return null, apps may need to add defensive code.
+  TEZ-1699. Vertex.setParallelism should throw an exception for invalid
+  invocations
 
 Release 0.5.1: 2014-10-02
 

http://git-wip-us.apache.org/repos/asf/tez/blob/06e9f88e/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index 5bd8768..c1f4bcd 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -123,9 +123,8 @@ public interface VertexManagerPluginContext {
    * @param sourceEdgeManagers Edge Managers to be updated
    * @param rootInputSpecUpdate Updated Root Input specifications, if any.
    *        If none specified, a default of 1 physical input is used
-   * @return true if the operation was allowed.
    */
-  public boolean setVertexParallelism(int parallelism,
+  public void setVertexParallelism(int parallelism,
       @Nullable VertexLocationHint locationHint,
       @Nullable Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
       @Nullable Map<String, InputSpecUpdate> rootInputSpecUpdate);

http://git-wip-us.apache.org/repos/asf/tez/blob/06e9f88e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index cefd34d..fa1f2c4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -87,7 +87,7 @@ public interface Vertex extends Comparable<Vertex> {
   @Nullable
   TaskLocationHint getTaskLocationHint(TezTaskID taskID);
 
-  boolean setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
+  void setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
       Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
       Map<String, InputSpecUpdate> rootInputSpecUpdate) throws AMUserCodeException;
   void setVertexLocationHint(VertexLocationHint vertexLocationHint);

http://git-wip-us.apache.org/repos/asf/tez/blob/06e9f88e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index b8e99d4..5c76a77 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1198,14 +1198,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   @Override
-  public boolean setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
+  public void setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
       Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
       Map<String, InputSpecUpdate> rootInputSpecUpdates) throws AMUserCodeException {
-    return setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdates,
-        false);
+    setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdates, false);
   }
 
-  private boolean setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
+  private void setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
       Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
       Map<String, InputSpecUpdate> rootInputSpecUpdates,
       boolean recovering) throws AMUserCodeException {
@@ -1235,7 +1234,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           LOG.info("Got updated RootInputsSpecs during recovery: " + rootInputSpecUpdates.toString());
           this.rootInputSpecs.putAll(rootInputSpecUpdates);
         }
-        return true;
+        return;
       } finally {
         writeLock.unlock();
       }
@@ -1246,8 +1245,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     writeLock.lock();
     try {
       if (parallelismSet == true) {
-        LOG.info("Parallelism can only be set dynamically once per vertex: " + logIdentifier);
-        return false;
+        String msg = "Parallelism can only be set dynamically once per vertex: " + logIdentifier; 
+        LOG.info(msg);
+        throw new TezUncheckedException(msg);
       }
 
       parallelismSet = true;
@@ -1311,9 +1311,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         if (parallelism >= numTasks) {
           // not that hard to support perhaps. but checking right now since there
           // is no use case for it and checking may catch other bugs.
-          LOG.warn("Increasing parallelism is not supported, vertexId="
-              + logIdentifier);
-          return false;
+          String msg = "Increasing parallelism is not supported, vertexId=" + logIdentifier; 
+          LOG.warn(msg);
+          throw new TezUncheckedException(msg);
         }
         if (parallelism == numTasks) {
           LOG.info("setParallelism same as current value: " + parallelism +
@@ -1342,10 +1342,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           Map.Entry<TezTaskID, Task> entry = iter.next();
           Task task = entry.getValue();
           if (task.getState() != TaskState.NEW) {
-            LOG.warn(
-                "All tasks must be in initial state when changing parallelism"
-                    + " for vertex: " + getVertexId() + " name: " + getName());
-            return false;
+            String msg = "All tasks must be in initial state when changing parallelism"
+                + " for vertex: " + getVertexId() + " name: " + getName(); 
+            LOG.warn(msg);
+            throw new TezUncheckedException(msg);
           }
           if (i <= parallelism) {
             continue;
@@ -1407,8 +1407,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     } finally {
       writeLock.unlock();
     }
-
-    return true;
   }
 
   public void setVertexLocationHint(VertexLocationHint vertexLocationHint) {
@@ -2532,8 +2530,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           }
           boolean successSetParallelism ;
           try {
-            successSetParallelism = vertex.setParallelism(0,
+            vertex.setParallelism(0,
               null, vertex.recoveredSourceEdgeManagers, vertex.recoveredRootInputSpecUpdates, true);
+            successSetParallelism = true;
           } catch (Exception e) {
             successSetParallelism = false;
           }
@@ -2589,8 +2588,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             break;
           }
           try {
-            successSetParallelism = vertex.setParallelism(0, null, vertex.recoveredSourceEdgeManagers,
+            vertex.setParallelism(0, null, vertex.recoveredSourceEdgeManagers,
               vertex.recoveredRootInputSpecUpdates, true);
+            successSetParallelism = true;
           } catch (Exception e) {
             successSetParallelism = false;
           }

http://git-wip-us.apache.org/repos/asf/tez/blob/06e9f88e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index c5fe41f..1bfb0f9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -108,11 +108,11 @@ public class VertexManager {
     }
 
     @Override
-    public boolean setVertexParallelism(int parallelism, VertexLocationHint vertexLocationHint,
+    public void setVertexParallelism(int parallelism, VertexLocationHint vertexLocationHint,
         Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
         Map<String, InputSpecUpdate> rootInputSpecUpdate) {
       try {
-        return managedVertex.setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers,
+        managedVertex.setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers,
             rootInputSpecUpdate);
       } catch (AMUserCodeException e) {
         // workaround: convert it to TezUncheckedException which would be caught in VM

http://git-wip-us.apache.org/repos/asf/tez/blob/06e9f88e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index af48c49..e79eeef 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -889,7 +889,7 @@ public class TestDAGImpl {
   }
   
   @SuppressWarnings("unchecked")
-  @Test()
+  @Test(timeout = 5000)
   public void testEdgeManager_GetNumDestinationTaskPhysicalInputs() {
     setupDAGWithCustomEdge(ExceptionLocation.GetNumDestinationTaskPhysicalInputs);
     dispatcher.getEventHandler().handle(
@@ -912,7 +912,7 @@ public class TestDAGImpl {
   }
 
   @SuppressWarnings("unchecked")
-  @Test()
+  @Test(timeout = 5000)
   public void testEdgeManager_GetNumSourceTaskPhysicalOutputs() {
     setupDAGWithCustomEdge(ExceptionLocation.GetNumSourceTaskPhysicalOutputs);
     dispatcher.getEventHandler().handle(
@@ -932,7 +932,7 @@ public class TestDAGImpl {
   }
   
   @SuppressWarnings("unchecked")
-  @Test()
+  @Test(timeout = 5000)
   public void testEdgeManager_RouteDataMovementEventToDestination() {
     setupDAGWithCustomEdge(ExceptionLocation.RouteDataMovementEventToDestination);
     dispatcher.getEventHandler().handle(
@@ -962,7 +962,7 @@ public class TestDAGImpl {
   }
   
   @SuppressWarnings("unchecked")
-  @Test()
+  @Test(timeout = 5000)
   public void testEdgeManager_RouteInputSourceTaskFailedEventToDestination() {
     setupDAGWithCustomEdge(ExceptionLocation.RouteInputSourceTaskFailedEventToDestination);
     dispatcher.getEventHandler().handle(
@@ -992,7 +992,7 @@ public class TestDAGImpl {
   }
   
   @SuppressWarnings("unchecked")
-  @Test()
+  @Test(timeout = 5000)
   public void testEdgeManager_GetNumDestinationConsumerTasks() {
     setupDAGWithCustomEdge(ExceptionLocation.GetNumDestinationConsumerTasks);
     dispatcher.getEventHandler().handle(
@@ -1023,7 +1023,7 @@ public class TestDAGImpl {
   }
   
   @SuppressWarnings("unchecked")
-  @Test()
+  @Test(timeout = 5000)
   public void testEdgeManager_RouteInputErrorEventToSource() {
     setupDAGWithCustomEdge(ExceptionLocation.RouteInputErrorEventToSource);
     dispatcher.getEventHandler().handle(

http://git-wip-us.apache.org/repos/asf/tez/blob/06e9f88e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 55ee05f..fdf0e07 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -72,6 +72,7 @@ import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.RootInputLeafOutput;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -2348,7 +2349,7 @@ public class TestVertexImpl {
     Map<String, EdgeManagerPluginDescriptor> edgeManagerDescriptors =
         Collections.singletonMap(
        v1.getName(), mockEdgeManagerDescriptor);
-    assertTrue(v3.setParallelism(1, null, edgeManagerDescriptors, null));
+    v3.setParallelism(1, null, edgeManagerDescriptors, null);
     assertTrue(v3.sourceVertices.get(v1).getEdgeManager() instanceof
         EdgeManagerForTest);
     Assert.assertEquals(1, v3.getTotalTasks());
@@ -2357,7 +2358,46 @@ public class TestVertexImpl {
     assertTrue(tasks.keySet().iterator().next().equals(firstTask));
 
   }
+
+  @Test(timeout = 5000)
+  public void testVertexSetParallelismIncreaseException() throws Exception {
+    initAllVertices(VertexState.INITED);
+    VertexImpl v3 = vertices.get("vertex3");
+    Assert.assertEquals(2, v3.getTotalTasks());
+    Map<TezTaskID, Task> tasks = v3.getTasks();
+    Assert.assertEquals(2, tasks.size());
+
+    VertexImpl v1 = vertices.get("vertex1");
+    startVertex(vertices.get("vertex2"));
+    startVertex(v1);
+
+    // increase not supported
+    try {
+      v3.setParallelism(100, null, null, null);
+      Assert.fail();
+    } catch (TezUncheckedException e) { }
+  }
   
+  @Test(timeout = 5000)
+  public void testVertexSetParallelismMultipleException() throws Exception {
+    initAllVertices(VertexState.INITED);
+    VertexImpl v3 = vertices.get("vertex3");
+    Assert.assertEquals(2, v3.getTotalTasks());
+    Map<TezTaskID, Task> tasks = v3.getTasks();
+    Assert.assertEquals(2, tasks.size());
+
+    VertexImpl v1 = vertices.get("vertex1");
+    startVertex(vertices.get("vertex2"));
+    startVertex(v1);
+    v3.setParallelism(1, null, null, null);
+
+    // multiple invocations not supported
+    try {
+      v3.setParallelism(1, null, null, null);
+      Assert.fail();
+    } catch (TezUncheckedException e) { }
+  }
+
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testVertexPendingTaskEvents() {
@@ -2415,8 +2455,7 @@ public class TestVertexImpl {
 
     Map<String, EdgeManagerPluginDescriptor> edgeManagerDescriptors =
         Collections.singletonMap(v3.getName(), edgeManagerDescriptor);
-    assertTrue(v5.setParallelism(v5.getTotalTasks() - 1, null,
-        edgeManagerDescriptors, null)); // Must decrease.
+    v5.setParallelism(v5.getTotalTasks() - 1, null, edgeManagerDescriptors, null); // Must decrease.
 
     VertexImpl v5Impl = (VertexImpl) v5;
 
@@ -2938,11 +2977,6 @@ public class TestVertexImpl {
     
   }
 
-  @Test(timeout = 5000)
-  public void testCommitterInitAndSetup() {
-    // FIXME need to add a test for this
-  }
-
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testBadCommitter() throws Exception {
@@ -3252,10 +3286,6 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
   }
 
-  @Test(timeout = 5000)
-  public void testHistoryEventGeneration() {
-  }
-
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testInvalidEvent() {
@@ -3391,7 +3421,7 @@ public class TestVertexImpl {
         initializer.stateUpdates.get(1).getVertexState());
   }
 
-  @Test(timeout = 1000000)
+  @Test(timeout = 10000)
   public void testInputInitializerEventMultipleAttempts() throws Exception {
     useCustomInitializer = true;
     customInitializer = new EventHandlingRootInputInitializer(null);