You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/11/17 20:22:13 UTC
[04/50] [abbrv] tez git commit: TEZ-1699. Vertex.setParallelism
should throw an exception for invalid invocations (bikas)
TEZ-1699. Vertex.setParallelism should throw an exception for invalid invocations (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/06e9f88e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/06e9f88e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/06e9f88e
Branch: refs/heads/TEZ-8
Commit: 06e9f88eb207f00ae2f31f46477001726754115c
Parents: 4e69bed
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Oct 29 18:12:41 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Oct 29 18:12:41 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 7 +++
.../tez/dag/api/VertexManagerPluginContext.java | 3 +-
.../java/org/apache/tez/dag/app/dag/Vertex.java | 2 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 36 ++++++-------
.../tez/dag/app/dag/impl/VertexManager.java | 4 +-
.../tez/dag/app/dag/impl/TestDAGImpl.java | 12 ++---
.../tez/dag/app/dag/impl/TestVertexImpl.java | 56 +++++++++++++++-----
7 files changed, 78 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/06e9f88e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 933a445..7bc96ce 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -14,6 +14,8 @@ INCOMPATIBLE CHANGES
TEZ-1666. UserPayload should be null if the payload is not specified.
0.5.1 client cannot talk to 0.5.2 AMs (TEZ-1666 and TEZ-1664).
context.getUserPayload can now return null, apps may need to add defensive code.
+ TEZ-1699. Vertex.setParallelism should throw an exception for invalid
+ invocations
ALL CHANGES:
TEZ-1620. Wait for application finish before stopping MiniTezCluster
@@ -78,6 +80,11 @@ ALL CHANGES:
TEZ-1701. ATS fixes to flush all history events and also using batching.
TEZ-792. Default staging path should have user name.
TEZ-1689. addendum - fix unit test failure.
+ TEZ-1666. UserPayload should be null if the payload is not specified.
+ 0.5.1 client cannot talk to 0.5.2 AMs (TEZ-1666 and TEZ-1664).
+ context.getUserPayload can now return null, apps may need to add defensive code.
+ TEZ-1699. Vertex.setParallelism should throw an exception for invalid
+ invocations
Release 0.5.1: 2014-10-02
http://git-wip-us.apache.org/repos/asf/tez/blob/06e9f88e/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index 5bd8768..c1f4bcd 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -123,9 +123,8 @@ public interface VertexManagerPluginContext {
* @param sourceEdgeManagers Edge Managers to be updated
* @param rootInputSpecUpdate Updated Root Input specifications, if any.
* If none specified, a default of 1 physical input is used
- * @return true if the operation was allowed.
*/
- public boolean setVertexParallelism(int parallelism,
+ public void setVertexParallelism(int parallelism,
@Nullable VertexLocationHint locationHint,
@Nullable Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
@Nullable Map<String, InputSpecUpdate> rootInputSpecUpdate);
http://git-wip-us.apache.org/repos/asf/tez/blob/06e9f88e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index cefd34d..fa1f2c4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -87,7 +87,7 @@ public interface Vertex extends Comparable<Vertex> {
@Nullable
TaskLocationHint getTaskLocationHint(TezTaskID taskID);
- boolean setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
+ void setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
Map<String, InputSpecUpdate> rootInputSpecUpdate) throws AMUserCodeException;
void setVertexLocationHint(VertexLocationHint vertexLocationHint);
http://git-wip-us.apache.org/repos/asf/tez/blob/06e9f88e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index b8e99d4..5c76a77 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1198,14 +1198,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
@Override
- public boolean setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
+ public void setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
Map<String, InputSpecUpdate> rootInputSpecUpdates) throws AMUserCodeException {
- return setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdates,
- false);
+ setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdates, false);
}
- private boolean setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
+ private void setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
Map<String, InputSpecUpdate> rootInputSpecUpdates,
boolean recovering) throws AMUserCodeException {
@@ -1235,7 +1234,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
LOG.info("Got updated RootInputsSpecs during recovery: " + rootInputSpecUpdates.toString());
this.rootInputSpecs.putAll(rootInputSpecUpdates);
}
- return true;
+ return;
} finally {
writeLock.unlock();
}
@@ -1246,8 +1245,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
writeLock.lock();
try {
if (parallelismSet == true) {
- LOG.info("Parallelism can only be set dynamically once per vertex: " + logIdentifier);
- return false;
+ String msg = "Parallelism can only be set dynamically once per vertex: " + logIdentifier;
+ LOG.info(msg);
+ throw new TezUncheckedException(msg);
}
parallelismSet = true;
@@ -1311,9 +1311,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
if (parallelism >= numTasks) {
// not that hard to support perhaps. but checking right now since there
// is no use case for it and checking may catch other bugs.
- LOG.warn("Increasing parallelism is not supported, vertexId="
- + logIdentifier);
- return false;
+ String msg = "Increasing parallelism is not supported, vertexId=" + logIdentifier;
+ LOG.warn(msg);
+ throw new TezUncheckedException(msg);
}
if (parallelism == numTasks) {
LOG.info("setParallelism same as current value: " + parallelism +
@@ -1342,10 +1342,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
Map.Entry<TezTaskID, Task> entry = iter.next();
Task task = entry.getValue();
if (task.getState() != TaskState.NEW) {
- LOG.warn(
- "All tasks must be in initial state when changing parallelism"
- + " for vertex: " + getVertexId() + " name: " + getName());
- return false;
+ String msg = "All tasks must be in initial state when changing parallelism"
+ + " for vertex: " + getVertexId() + " name: " + getName();
+ LOG.warn(msg);
+ throw new TezUncheckedException(msg);
}
if (i <= parallelism) {
continue;
@@ -1407,8 +1407,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
} finally {
writeLock.unlock();
}
-
- return true;
}
public void setVertexLocationHint(VertexLocationHint vertexLocationHint) {
@@ -2532,8 +2530,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
boolean successSetParallelism ;
try {
- successSetParallelism = vertex.setParallelism(0,
+ vertex.setParallelism(0,
null, vertex.recoveredSourceEdgeManagers, vertex.recoveredRootInputSpecUpdates, true);
+ successSetParallelism = true;
} catch (Exception e) {
successSetParallelism = false;
}
@@ -2589,8 +2588,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
break;
}
try {
- successSetParallelism = vertex.setParallelism(0, null, vertex.recoveredSourceEdgeManagers,
+ vertex.setParallelism(0, null, vertex.recoveredSourceEdgeManagers,
vertex.recoveredRootInputSpecUpdates, true);
+ successSetParallelism = true;
} catch (Exception e) {
successSetParallelism = false;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/06e9f88e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index c5fe41f..1bfb0f9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -108,11 +108,11 @@ public class VertexManager {
}
@Override
- public boolean setVertexParallelism(int parallelism, VertexLocationHint vertexLocationHint,
+ public void setVertexParallelism(int parallelism, VertexLocationHint vertexLocationHint,
Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
Map<String, InputSpecUpdate> rootInputSpecUpdate) {
try {
- return managedVertex.setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers,
+ managedVertex.setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers,
rootInputSpecUpdate);
} catch (AMUserCodeException e) {
// workaround: convert it to TezUncheckedException which would be caught in VM
http://git-wip-us.apache.org/repos/asf/tez/blob/06e9f88e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index af48c49..e79eeef 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -889,7 +889,7 @@ public class TestDAGImpl {
}
@SuppressWarnings("unchecked")
- @Test()
+ @Test(timeout = 5000)
public void testEdgeManager_GetNumDestinationTaskPhysicalInputs() {
setupDAGWithCustomEdge(ExceptionLocation.GetNumDestinationTaskPhysicalInputs);
dispatcher.getEventHandler().handle(
@@ -912,7 +912,7 @@ public class TestDAGImpl {
}
@SuppressWarnings("unchecked")
- @Test()
+ @Test(timeout = 5000)
public void testEdgeManager_GetNumSourceTaskPhysicalOutputs() {
setupDAGWithCustomEdge(ExceptionLocation.GetNumSourceTaskPhysicalOutputs);
dispatcher.getEventHandler().handle(
@@ -932,7 +932,7 @@ public class TestDAGImpl {
}
@SuppressWarnings("unchecked")
- @Test()
+ @Test(timeout = 5000)
public void testEdgeManager_RouteDataMovementEventToDestination() {
setupDAGWithCustomEdge(ExceptionLocation.RouteDataMovementEventToDestination);
dispatcher.getEventHandler().handle(
@@ -962,7 +962,7 @@ public class TestDAGImpl {
}
@SuppressWarnings("unchecked")
- @Test()
+ @Test(timeout = 5000)
public void testEdgeManager_RouteInputSourceTaskFailedEventToDestination() {
setupDAGWithCustomEdge(ExceptionLocation.RouteInputSourceTaskFailedEventToDestination);
dispatcher.getEventHandler().handle(
@@ -992,7 +992,7 @@ public class TestDAGImpl {
}
@SuppressWarnings("unchecked")
- @Test()
+ @Test(timeout = 5000)
public void testEdgeManager_GetNumDestinationConsumerTasks() {
setupDAGWithCustomEdge(ExceptionLocation.GetNumDestinationConsumerTasks);
dispatcher.getEventHandler().handle(
@@ -1023,7 +1023,7 @@ public class TestDAGImpl {
}
@SuppressWarnings("unchecked")
- @Test()
+ @Test(timeout = 5000)
public void testEdgeManager_RouteInputErrorEventToSource() {
setupDAGWithCustomEdge(ExceptionLocation.RouteInputErrorEventToSource);
dispatcher.getEventHandler().handle(
http://git-wip-us.apache.org/repos/asf/tez/blob/06e9f88e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 55ee05f..fdf0e07 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -72,6 +72,7 @@ import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -2348,7 +2349,7 @@ public class TestVertexImpl {
Map<String, EdgeManagerPluginDescriptor> edgeManagerDescriptors =
Collections.singletonMap(
v1.getName(), mockEdgeManagerDescriptor);
- assertTrue(v3.setParallelism(1, null, edgeManagerDescriptors, null));
+ v3.setParallelism(1, null, edgeManagerDescriptors, null);
assertTrue(v3.sourceVertices.get(v1).getEdgeManager() instanceof
EdgeManagerForTest);
Assert.assertEquals(1, v3.getTotalTasks());
@@ -2357,7 +2358,46 @@ public class TestVertexImpl {
assertTrue(tasks.keySet().iterator().next().equals(firstTask));
}
+
+ @Test(timeout = 5000)
+ public void testVertexSetParallelismIncreaseException() throws Exception {
+ initAllVertices(VertexState.INITED);
+ VertexImpl v3 = vertices.get("vertex3");
+ Assert.assertEquals(2, v3.getTotalTasks());
+ Map<TezTaskID, Task> tasks = v3.getTasks();
+ Assert.assertEquals(2, tasks.size());
+
+ VertexImpl v1 = vertices.get("vertex1");
+ startVertex(vertices.get("vertex2"));
+ startVertex(v1);
+
+ // increase not supported
+ try {
+ v3.setParallelism(100, null, null, null);
+ Assert.fail();
+ } catch (TezUncheckedException e) { }
+ }
+ @Test(timeout = 5000)
+ public void testVertexSetParallelismMultipleException() throws Exception {
+ initAllVertices(VertexState.INITED);
+ VertexImpl v3 = vertices.get("vertex3");
+ Assert.assertEquals(2, v3.getTotalTasks());
+ Map<TezTaskID, Task> tasks = v3.getTasks();
+ Assert.assertEquals(2, tasks.size());
+
+ VertexImpl v1 = vertices.get("vertex1");
+ startVertex(vertices.get("vertex2"));
+ startVertex(v1);
+ v3.setParallelism(1, null, null, null);
+
+ // multiple invocations not supported
+ try {
+ v3.setParallelism(1, null, null, null);
+ Assert.fail();
+ } catch (TezUncheckedException e) { }
+ }
+
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testVertexPendingTaskEvents() {
@@ -2415,8 +2455,7 @@ public class TestVertexImpl {
Map<String, EdgeManagerPluginDescriptor> edgeManagerDescriptors =
Collections.singletonMap(v3.getName(), edgeManagerDescriptor);
- assertTrue(v5.setParallelism(v5.getTotalTasks() - 1, null,
- edgeManagerDescriptors, null)); // Must decrease.
+ v5.setParallelism(v5.getTotalTasks() - 1, null, edgeManagerDescriptors, null); // Must decrease.
VertexImpl v5Impl = (VertexImpl) v5;
@@ -2938,11 +2977,6 @@ public class TestVertexImpl {
}
- @Test(timeout = 5000)
- public void testCommitterInitAndSetup() {
- // FIXME need to add a test for this
- }
-
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testBadCommitter() throws Exception {
@@ -3252,10 +3286,6 @@ public class TestVertexImpl {
Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
}
- @Test(timeout = 5000)
- public void testHistoryEventGeneration() {
- }
-
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testInvalidEvent() {
@@ -3391,7 +3421,7 @@ public class TestVertexImpl {
initializer.stateUpdates.get(1).getVertexState());
}
- @Test(timeout = 1000000)
+ @Test(timeout = 10000)
public void testInputInitializerEventMultipleAttempts() throws Exception {
useCustomInitializer = true;
customInitializer = new EventHandlingRootInputInitializer(null);