You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/02/05 22:00:45 UTC
[1/8] tez git commit: TEZ-1929. pre-empted tasks should be marked as
killed instead of failed (bikas)
Repository: tez
Updated Branches:
refs/heads/TEZ-2003 753b7efde -> f49c054c9 (forced update)
TEZ-1929. pre-empted tasks should be marked as killed instead of failed (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6ba1339d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6ba1339d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6ba1339d
Branch: refs/heads/TEZ-2003
Commit: 6ba1339d57a5d05fa14f35f352076131bffea483
Parents: 21bce95
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Jan 30 16:40:11 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Jan 30 16:40:11 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../rm/container/AMContainerEventCompleted.java | 5 +--
.../dag/app/rm/container/AMContainerImpl.java | 4 +--
.../tez/dag/app/TestMockDAGAppMaster.java | 38 +++++++++++++++++++-
.../app/rm/TestTaskSchedulerEventHandler.java | 2 +-
.../dag/app/rm/container/TestAMContainer.java | 6 ++--
6 files changed, 47 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/6ba1339d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3414adc..43d009d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -55,6 +55,7 @@ Release 0.6.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1929. pre-empted tasks should be marked as killed instead of failed
TEZ-2017. TEZ UI - Dag view throwing error whild re-displaying additionals in some dags.
TEZ-2013. TEZ UI - App Details Page UI Nits
TEZ-2014. Tez UI: Nits : All tables, Vertices Page UI.
http://git-wip-us.apache.org/repos/asf/tez/blob/6ba1339d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
index a455f1e..9bb6d7f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
@@ -37,14 +37,15 @@ public class AMContainerEventCompleted extends AMContainerEvent {
}
public boolean isPreempted() {
- return (exitStatus == ContainerExitStatus.PREEMPTED);
+ return (exitStatus == ContainerExitStatus.PREEMPTED ||
+ errCause == TaskAttemptTerminationCause.INTERNAL_PREEMPTION);
}
public boolean isDiskFailed() {
return (exitStatus == ContainerExitStatus.DISKS_FAILED);
}
- public boolean isClusterAction() {
+ public boolean isSystemAction() {
return isPreempted() || isDiskFailed();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6ba1339d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 536001c..5c5a8c5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -662,7 +662,7 @@ public class AMContainerImpl implements AMContainer {
AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
if (container.pendingAttempt != null) {
String errorMessage = getMessage(container, event);
- if (event.isClusterAction()) {
+ if (event.isSystemAction()) {
container.sendContainerTerminatedBySystemToTaskAttempt(container.pendingAttempt,
errorMessage, event.getTerminationCause());
} else {
@@ -921,7 +921,7 @@ public class AMContainerImpl implements AMContainer {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
- if (event.isClusterAction()) {
+ if (event.isSystemAction()) {
container.sendContainerTerminatedBySystemToTaskAttempt(container.runningAttempt,
getMessage(container, event), event.getTerminationCause());
} else {
http://git-wip-us.apache.org/repos/asf/tez/blob/6ba1339d/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index bed971a..a46821a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -37,10 +36,16 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher.ContainerData;
import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
+import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
import org.junit.Assert;
import org.junit.Test;
@@ -104,6 +109,37 @@ public class TestMockDAGAppMaster {
tezClient.stop();
}
+ @Test (timeout = 5000)
+ public void testInternalPreemption() throws Exception {
+ TezConfiguration tezconf = new TezConfiguration(defaultConf);
+
+ MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
+ tezClient.start();
+
+ MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+ MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
+ mockLauncher.startScheduling(false);
+ // there is only 1 task whose first attempt will be preempted
+ DAG dag = DAG.create("test");
+ Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 1);
+ dag.addVertex(vA);
+
+ DAGClient dagClient = tezClient.submitDAG(dag);
+ mockLauncher.waitTillContainersLaunched();
+ ContainerData cData = mockLauncher.getContainers().values().iterator().next();
+ DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+ mockApp.getTaskSchedulerEventHandler().preemptContainer(cData.cId);
+
+ mockLauncher.startScheduling(true);
+ dagClient.waitForCompletion();
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+ TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), 0);
+ TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
+ TaskAttempt killedTa = dagImpl.getVertex(vA.getName()).getTask(0).getAttempt(killedTaId);
+ Assert.assertEquals(TaskAttemptState.KILLED, killedTa.getState());
+ tezClient.stop();
+ }
+
@Test (timeout = 10000)
public void testMultipleSubmissions() throws Exception {
Map<String, LocalResource> lrDAG = Maps.newHashMap();
http://git-wip-us.apache.org/repos/asf/tez/blob/6ba1339d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index d2dece3..62618cc 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -204,7 +204,7 @@ public class TestTaskSchedulerEventHandler {
AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event;
Assert.assertEquals(mockCId, completedEvent.getContainerId());
Assert.assertEquals("Container preempted internally", completedEvent.getDiagnostics());
- Assert.assertFalse(completedEvent.isPreempted());
+ Assert.assertTrue(completedEvent.isPreempted());
Assert.assertFalse(completedEvent.isDiskFailed());
Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
completedEvent.getTerminationCause());
http://git-wip-us.apache.org/repos/asf/tez/blob/6ba1339d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index f273896..438c50d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -704,10 +704,10 @@ public class TestAMContainer {
verify(wc.chh).unregister(wc.containerID);
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
- Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
- ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause());
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
- TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
+ Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
+ ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause());
assertFalse(wc.amContainer.isInErrorState());
[4/8] tez git commit: TEZ-1895. Vertex reRunning should decrease
successfulMembers of VertexGroupInfo (zjffdu)
Posted by ss...@apache.org.
TEZ-1895. Vertex reRunning should decrease successfulMembers of VertexGroupInfo (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cfa637a1
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cfa637a1
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cfa637a1
Branch: refs/heads/TEZ-2003
Commit: cfa637a16fa01b197c0310e03ef4a6e19883aaf1
Parents: ad6bf07
Author: Jeff Zhang <zj...@apache.org>
Authored: Mon Feb 2 10:45:32 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Mon Feb 2 10:45:32 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/dag/app/dag/DAGTerminationCause.java | 3 ++
.../tez/dag/app/dag/VertexTerminationCause.java | 3 ++
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 25 ++++++---
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 23 +++++++++
.../tez/dag/app/dag/impl/TestDAGImpl.java | 53 +++++++++++++++++++-
6 files changed, 101 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 03b0624..24c5b32 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1895. Vertex reRunning should decrease successfulMembers of VertexGroupInfo.
TEZ-1999. IndexOutOfBoundsException during merge.
TEZ-2000. Source vertex exists error during DAG submission.
TEZ-2008. Add methods to SecureShuffleUtils to verify a reply based on a provided Key.
http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
index d01fb2f..5ae96a1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
@@ -39,6 +39,9 @@ public enum DAGTerminationCause {
/** DAG failed during output commit. */
COMMIT_FAILURE,
+ /** In some cases, vertex could not rerun, e.g. its output been committed as a shared output of vertex group */
+ VERTEX_RERUN_AFTER_COMMIT,
+
/** DAG failed while trying to write recovery events */
RECOVERY_FAILURE,
http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
index 4bfe001..2eeae3c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
@@ -43,6 +43,9 @@ public enum VertexTerminationCause {
/** This vertex failed during commit. */
COMMIT_FAILURE,
+ /** In some cases, vertex could not rerun, e.g. its output been committed as a shared output of vertex group */
+ VERTEX_RERUN_AFTER_COMMIT,
+
/** This vertex failed as it had invalid number tasks. */
INVALID_NUM_OF_TASKS,
http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index f4e5bad..aa7723b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1083,6 +1083,14 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
dag.addDiagnostic(diagnosticMsg);
return dag.finished(DAGState.FAILED);
}
+ if(dag.terminationCause == DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT ){
+ String diagnosticMsg = "DAG failed due to vertex rerun after commit." +
+ " failedVertices:" + dag.numFailedVertices +
+ " killedVertices:" + dag.numKilledVertices;
+ LOG.info(diagnosticMsg);
+ dag.addDiagnostic(diagnosticMsg);
+ return dag.finished(DAGState.FAILED);
+ }
if(dag.terminationCause == DAGTerminationCause.RECOVERY_FAILURE ){
String diagnosticMsg = "DAG failed due to failure in recovery handling." +
" failedVertices:" + dag.numFailedVertices +
@@ -1738,9 +1746,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
public DAGState transition(DAGImpl job, DAGEvent event) {
DAGEventVertexReRunning vertexEvent = (DAGEventVertexReRunning) event;
Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
- job.numCompletedVertices--;
boolean failed = job.vertexReRunning(vertex);
-
+ if (!failed) {
+ job.numCompletedVertices--;
+ }
LOG.info("Vertex " + vertex.getLogIdentifier() + " re-running."
+ ", numCompletedVertices=" + job.numCompletedVertices
@@ -1848,11 +1857,15 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
if (groupList != null) {
for (VertexGroupInfo groupInfo : groupList) {
if (groupInfo.committed) {
- LOG.info("Aborting job as committed vertex: "
- + vertex.getLogIdentifier() + " is re-running");
- enactKill(DAGTerminationCause.COMMIT_FAILURE,
- VertexTerminationCause.COMMIT_FAILURE);
+ String msg = "Aborting job as committed vertex: "
+ + vertex.getLogIdentifier() + " is re-running";
+ LOG.info(msg);
+ addDiagnostic(msg);
+ enactKill(DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT,
+ VertexTerminationCause.VERTEX_RERUN_AFTER_COMMIT);
return true;
+ } else {
+ groupInfo.successfulMembers--;
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 577c98b..c3f4ae7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1826,6 +1826,28 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
vertex.abortVertex(State.FAILED);
return vertex.finished(VertexState.FAILED);
}
+ else if (vertex.terminationCause == VertexTerminationCause.COMMIT_FAILURE) {
+ vertex.setFinishTime();
+ String diagnosticMsg = "Vertex failed/killed due to COMMIT_FAILURE failed. "
+ + "failedTasks:"
+ + vertex.failedTaskCount
+ + " killedTasks:"
+ + vertex.killedTaskCount;
+ LOG.info(diagnosticMsg);
+ vertex.abortVertex(State.FAILED);
+ return vertex.finished(VertexState.FAILED);
+ }
+ else if (vertex.terminationCause == VertexTerminationCause.VERTEX_RERUN_AFTER_COMMIT) {
+ vertex.setFinishTime();
+ String diagnosticMsg = "Vertex failed/killed due to invalid rerun failed. "
+ + "failedTasks:"
+ + vertex.failedTaskCount
+ + " killedTasks:"
+ + vertex.killedTaskCount;
+ LOG.info(diagnosticMsg);
+ vertex.abortVertex(State.FAILED);
+ return vertex.finished(VertexState.FAILED);
+ }
else {
//should never occur
throw new TezUncheckedException("All tasks complete, but cannot determine final state of vertex:" + vertex.logIdentifier
@@ -3472,6 +3494,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
case INIT_FAILURE:
case INTERNAL_ERROR:
case AM_USERCODE_FAILURE:
+ case VERTEX_RERUN_AFTER_COMMIT:
case OTHER_VERTEX_FAILURE: vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_VERTEX_FAILURE); break;
default://should not occur
throw new TezUncheckedException("VertexKilledTransition: event.terminationCause is unexpected: " + trigger);
http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 7c4d715..cae9059 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -91,6 +91,7 @@ import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
+import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
@@ -774,6 +775,9 @@ public class TestDAGImpl {
doReturn(appAttemptId.getApplicationId())
.when(groupAppContext).getApplicationID();
doReturn(historyEventHandler).when(groupAppContext).getHistoryHandler();
+
+ // reset totalCommitCounter to 0
+ TotalCountingOutputCommitter.totalCommitCounter = 0;
taskEventDispatcher = new TaskEventDispatcher();
dispatcher.register(TaskEventType.class, taskEventDispatcher);
taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
@@ -1080,7 +1084,54 @@ public class TestDAGImpl {
Assert.assertEquals(DAGState.SUCCEEDED, groupDag.getState());
Assert.assertEquals(2, TotalCountingOutputCommitter.totalCommitCounter);
}
-
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testGroupDAGWithVertexReRunning() {
+ conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
+ initDAG(groupDag);
+ startDAG(groupDag);
+ dispatcher.await();
+
+ Vertex v1 = groupDag.getVertex("vertex1");
+ Vertex v2 = groupDag.getVertex("vertex2");
+ dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
+ dispatcher.getEventHandler().handle(new DAGEventVertexReRunning(v1.getVertexId()));
+ dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED));
+ dispatcher.await();
+ // commit should not happen due to vertex-rerunning
+ Assert.assertEquals(0, TotalCountingOutputCommitter.totalCommitCounter);
+
+ dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
+ dispatcher.await();
+ // commit happen
+ Assert.assertEquals(1, TotalCountingOutputCommitter.totalCommitCounter);
+ Assert.assertEquals(2, groupDag.getSuccessfulVertices());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testGroupDAGWithVertexReRunningAfterCommit() {
+ conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
+ initDAG(groupDag);
+ startDAG(groupDag);
+ dispatcher.await();
+
+ Vertex v1 = groupDag.getVertex("vertex1");
+ Vertex v2 = groupDag.getVertex("vertex2");
+ dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
+ dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED));
+ dispatcher.await();
+ // vertex group commit happens
+ Assert.assertEquals(1, TotalCountingOutputCommitter.totalCommitCounter);
+
+ // dag failed when vertex re-run happens after vertex group commit is done.
+ dispatcher.getEventHandler().handle(new DAGEventVertexReRunning(v1.getVertexId()));
+ dispatcher.await();
+ Assert.assertEquals(DAGState.FAILED, groupDag.getState());
+ Assert.assertEquals(DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT, groupDag.getTerminationCause());
+ }
+
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testDAGCompletionWithCommitSuccess() {
[3/8] tez git commit: TEZ-1999. IndexOutOfBoundsException during
merge (rbalamohan)
Posted by ss...@apache.org.
TEZ-1999. IndexOutOfBoundsException during merge (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ad6bf07e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ad6bf07e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ad6bf07e
Branch: refs/heads/TEZ-2003
Commit: ad6bf07eba9923fca2627503652d16cfceb72d39
Parents: b726869
Author: Rajesh Balamohan <rb...@hortonworks.com>
Authored: Sat Jan 31 19:53:23 2015 +0530
Committer: Rajesh Balamohan <rb...@hortonworks.com>
Committed: Sat Jan 31 19:53:23 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../library/common/sort/impl/TezMerger.java | 29 +-
.../library/common/sort/impl/TestTezMerger.java | 518 ++++++++++++++++++-
3 files changed, 525 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/ad6bf07e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5c0bec0..03b0624 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1999. IndexOutOfBoundsException during merge.
TEZ-2000. Source vertex exists error during DAG submission.
TEZ-2008. Add methods to SecureShuffleUtils to verify a reply based on a provided Key.
TEZ-1995. Build failure against hadoop 2.2.
http://git-wip-us.apache.org/repos/asf/tez/blob/ad6bf07e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index ed9a59d..5dd538a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -487,13 +487,32 @@ public class TezMerger {
return value;
}
+ private void populatePreviousKey() throws IOException {
+ key.reset();
+ BufferUtils.copy(key, prevKey);
+ }
+
private void adjustPriorityQueue(Segment reader) throws IOException{
long startPos = reader.getPosition();
- if (hasNext != null && hasNext != KeyState.SAME_KEY) {
- key.reset();
- // TODO: This copy can be an unwanted operation when all keys are unique. Revisit this
- // when we have better stats.
- BufferUtils.copy(key, prevKey);
+ if (hasNext == null) {
+ /**
+ * hasNext can be null during first iteration & prevKey is initialized here.
+ * In cases of NO_KEY/NEW_KEY, we readjust the queue later. If new segment/file is found
+ * during this process, we need to compare keys for RLE across segment boundaries.
+ * prevKey can't be empty at that time (e.g custom comparators)
+ */
+ populatePreviousKey();
+ } else {
+ //indicates a key has been read already
+ if (hasNext != KeyState.SAME_KEY) {
+ /**
+ * Store previous key before reading next for later key comparisons.
+ * If all keys in a segment are unique, it would always hit this code path and key copies
+ * are wasteful in such condition, as these comparisons are mainly done for RLE.
+ * TODO: When better stats are available, this condition can be avoided.
+ */
+ populatePreviousKey();
+ }
}
hasNext = reader.readRawKey();
long endPos = reader.getPosition();
http://git-wip-us.apache.org/repos/asf/tez/blob/ad6bf07e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
index ac17d8d..1e14b9b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
@@ -18,6 +18,7 @@
package org.apache.tez.runtime.library.common.sort.impl;
+import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
@@ -32,23 +33,27 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.TestMergeManager;
-import org.junit.After;
import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import static org.junit.Assert.assertTrue;
+
public class TestTezMerger {
private static final Log LOG = LogFactory.getLog(TestTezMerger.class);
@@ -56,6 +61,11 @@ public class TestTezMerger {
private static Configuration defaultConf = new Configuration();
private static FileSystem localFs = null;
private static Path workDir = null;
+ private static RawComparator comparator = null;
+ private static Random rnd = new Random();
+
+ private static final String SAME_KEY = "SAME_KEY";
+ private static final String DIFF_KEY = "DIFF_KEY";
//store the generated data for final verification
private static ListMultimap<Integer, Long> verificationDataSet = LinkedListMultimap.create();
@@ -76,6 +86,7 @@ public class TestTezMerger {
Path baseDir = new Path(workDir, TestMergeManager.class.getName());
String localDirs = baseDir.toString();
defaultConf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
+ comparator = ConfigUtils.getIntermediateInputKeyComparator(defaultConf);
}
@AfterClass
@@ -83,8 +94,7 @@ public class TestTezMerger {
localFs.delete(workDir, true);
}
-
- @Test
+ @Test(timeout = 80000)
public void testMerge() throws Exception {
/**
* test with number of files, keys per file and mergefactor
@@ -95,6 +105,7 @@ public class TestTezMerger {
merge(100, 0, 5);
//small files
+ merge(12, 4, 2);
merge(2, 10, 2);
merge(1, 10, 1);
merge(5, 10, 3);
@@ -105,17 +116,487 @@ public class TestTezMerger {
merge(5, 1000, 5);
merge(5, 1000, 10);
merge(5, 1000, 100);
+
+ //Create random mix of files (empty files + files with keys)
+ List<Path> pathList = new LinkedList<Path>();
+ pathList.clear();
+ pathList.addAll(createIFiles(Math.max(2, rnd.nextInt(20)), 0));
+ pathList.addAll(createIFiles(Math.max(2, rnd.nextInt(20)), Math.max(2, rnd.nextInt(10))));
+ merge(pathList, Math.max(2, rnd.nextInt(10)));
+ }
+
+ private Path createIFileWithTextData(List<String> data) throws IOException {
+ Path path = new Path(workDir + "/src", "data_" + System.nanoTime() + ".out");
+ FSDataOutputStream out = localFs.create(path);
+ IFile.Writer writer = new IFile.Writer(defaultConf, out, Text.class,
+ Text.class, null, null, null, true);
+ for (String key : data) {
+ writer.append(new Text(key), new Text(key + "_" + System.nanoTime()));
+ }
+ writer.close();
+ out.close();
+ return path;
+ }
+
+ /**
+ * Verify if the records are as per the expected data set
+ *
+ * @param records
+ * @param expectedResult
+ * @throws IOException
+ */
+ private void verify(TezRawKeyValueIterator records, String[][] expectedResult)
+ throws IOException {
+ //Iterate through merged dataset (shouldn't throw any exceptions)
+ int i = 0;
+ while (records.next()) {
+ DataInputBuffer key = records.getKey();
+ DataInputBuffer value = records.getValue();
+
+ Text k = new Text();
+ k.readFields(key);
+ Text v = new Text();
+ v.readFields(value);
+
+ assertTrue(k.toString().equals(expectedResult[i][0]));
+
+ String correctResult = expectedResult[i][1];
+
+ if (records.isSameKey()) {
+ assertTrue("Expected " + correctResult, correctResult.equalsIgnoreCase(SAME_KEY));
+ LOG.info("\tSame Key : key=" + k + ", val=" + v);
+ } else {
+ assertTrue("Expected " + correctResult, correctResult.equalsIgnoreCase(DIFF_KEY));
+ LOG.info("key=" + k + ", val=" + v);
+ }
+
+ i++;
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testWithCustomComparator_WithEmptyStrings() throws Exception {
+ List<Path> pathList = new LinkedList<Path>();
+ List<String> data = Lists.newLinkedList();
+ //Merge datasets with custom comparator
+ RawComparator rc = new CustomComparator();
+
+ LOG.info("Test with custom comparator with empty strings in middle");
+
+ //Test with 4 files, where some texts are empty strings
+ data.add("0");
+ data.add("0");
+ pathList.add(createIFileWithTextData(data));
+
+ //Second file with empty key
+ data.clear();
+ data.add("");
+ pathList.add(createIFileWithTextData(data));
+
+ //Third file
+ data.clear();
+ data.add("0");
+ data.add("0");
+ pathList.add(createIFileWithTextData(data));
+
+ //Third file
+ data.clear();
+ data.add("1");
+ data.add("2");
+ pathList.add(createIFileWithTextData(data));
+
+ TezRawKeyValueIterator records = merge(pathList, rc);
+
+ String[][] expectedResult =
+ {
+ //formatting intentionally
+ { "", DIFF_KEY },
+ { "0", DIFF_KEY },
+ { "0", SAME_KEY },
+ { "0", SAME_KEY },
+ { "0", SAME_KEY },
+ { "1", DIFF_KEY },
+ { "2", DIFF_KEY }
+ };
+
+ verify(records, expectedResult);
+ pathList.clear();
+ data.clear();
+ }
+
+ @Test(timeout = 5000)
+ public void testWithCustomComparator_No_RLE() throws Exception {
+ List<Path> pathList = new LinkedList<Path>();
+ List<String> data = Lists.newLinkedList();
+ //Merge datasets with custom comparator
+ RawComparator rc = new CustomComparator();
+
+ LOG.info("Test with custom comparator with no RLE");
+
+ //Test with 3 files,
+ data.add("1");
+ data.add("4");
+ data.add("5");
+ pathList.add(createIFileWithTextData(data));
+
+ //Second file with empty key
+ data.clear();
+ data.add("2");
+ data.add("6");
+ data.add("7");
+ pathList.add(createIFileWithTextData(data));
+
+ //Third file
+ data.clear();
+ data.add("3");
+ data.add("8");
+ data.add("9");
+ pathList.add(createIFileWithTextData(data));
+
+ TezRawKeyValueIterator records = merge(pathList, rc);
+
+ String[][] expectedResult =
+ {
+ { "1", DIFF_KEY },
+ { "2", DIFF_KEY },
+ { "3", DIFF_KEY },
+ { "4", DIFF_KEY },
+ { "5", DIFF_KEY },
+ { "6", DIFF_KEY },
+ { "7", DIFF_KEY },
+ { "8", DIFF_KEY },
+ { "9", DIFF_KEY }
+ };
+
+ verify(records, expectedResult);
+ pathList.clear();
+ data.clear();
+ }
+
+ @Test(timeout = 5000)
+ public void testWithCustomComparator_RLE_acrossFiles() throws Exception {
+ List<Path> pathList = new LinkedList<Path>();
+ List<String> data = Lists.newLinkedList();
+
+ LOG.info("Test with custom comparator with RLE spanning across segment boundaries");
+
+ //Test with 2 files, where the RLE keys can span across files
+ //First file
+ data.clear();
+ data.add("0");
+ data.add("0");
+ pathList.add(createIFileWithTextData(data));
+
+ //Second file
+ data.clear();
+ data.add("0");
+ data.add("1");
+ pathList.add(createIFileWithTextData(data));
+
+ //Merge datasets with custom comparator
+ RawComparator rc = new CustomComparator();
+ TezRawKeyValueIterator records = merge(pathList, rc);
+
+ //expected result
+ String[][] expectedResult =
+ {
+ //formatting intentionally
+ { "0", DIFF_KEY },
+ { "0", SAME_KEY },
+ { "0", SAME_KEY },
+ { "1", DIFF_KEY }
+ };
+
+ verify(records, expectedResult);
+ pathList.clear();
+ data.clear();
+
+ }
+
+ @Test(timeout = 5000)
+ public void testWithCustomComparator_mixedFiles() throws Exception {
+ List<Path> pathList = new LinkedList<Path>();
+ List<String> data = Lists.newLinkedList();
+
+ LOG.info("Test with custom comparator with mixed set of segments (empty, non-empty etc)");
+
+ //Test with 2 files, where the RLE keys can span across files
+ //First file
+ data.clear();
+ data.add("0");
+ pathList.add(createIFileWithTextData(data));
+
+ //Second file; empty file
+ data.clear();
+ pathList.add(createIFileWithTextData(data));
+
+ //Third file with empty key
+ data.clear();
+ data.add("");
+ pathList.add(createIFileWithTextData(data));
+
+ //Fourth file with repeated keys
+ data.clear();
+ data.add("0");
+ data.add("0");
+ data.add("0");
+ pathList.add(createIFileWithTextData(data));
+
+ //Merge datasets with custom comparator
+ RawComparator rc = new CustomComparator();
+ TezRawKeyValueIterator records = merge(pathList, rc);
+
+ //expected result
+ String[][] expectedResult =
+ {
+ //formatting intentionally
+ { "", DIFF_KEY },
+ { "0", DIFF_KEY },
+ { "0", SAME_KEY },
+ { "0", SAME_KEY },
+ { "0", SAME_KEY }
+ };
+
+ verify(records, expectedResult);
+ pathList.clear();
+ data.clear();
+ }
+
+ @Test(timeout = 5000)
+ public void testWithCustomComparator_RLE() throws Exception {
+ List<Path> pathList = new LinkedList<Path>();
+ List<String> data = Lists.newLinkedList();
+
+ LOG.info("Test with custom comparator 2 files one containing RLE and also other segment "
+ + "starting with same key");
+
+ //Test with 2 files, same keys in middle of file
+ //First file
+ data.clear();
+ data.add("1");
+ data.add("2");
+ data.add("2");
+ pathList.add(createIFileWithTextData(data));
+
+ //Second file
+ data.clear();
+ data.add("2");
+ data.add("3");
+ pathList.add(createIFileWithTextData(data));
+
+ TezRawKeyValueIterator records = merge(pathList, new CustomComparator());
+
+ String[][] expectedResult =
+ {
+ //formatting intentionally
+ { "1", DIFF_KEY },
+ { "2", DIFF_KEY },
+ { "2", SAME_KEY },
+ { "2", SAME_KEY },
+ { "3", DIFF_KEY }
+ };
+
+ verify(records, expectedResult);
+ pathList.clear();
+ data.clear();
+ }
+
+ @Test(timeout = 5000)
+ public void testWithCustomComparator_RLE2() throws Exception {
+ List<Path> pathList = new LinkedList<Path>();
+ List<String> data = Lists.newLinkedList();
+
+ LOG.info(
+ "Test with custom comparator 3 files with RLE (starting keys) spanning across boundaries");
+
+ //Test with 3 files, same keys in middle of file
+ //First file
+ data.clear();
+ data.add("0");
+ data.add("1");
+ data.add("1");
+ pathList.add(createIFileWithTextData(data));
+
+ //Second file
+ data.clear();
+ data.add("0");
+ data.add("1");
+ pathList.add(createIFileWithTextData(data));
+
+ //Third file
+ data.clear();
+ data.add("0");
+ data.add("1");
+ data.add("1");
+ pathList.add(createIFileWithTextData(data));
+
+ TezRawKeyValueIterator records = merge(pathList, new CustomComparator());
+ String[][] expectedResult =
+ {
+ //formatting intentionally
+ { "0", DIFF_KEY },
+ { "0", SAME_KEY },
+ { "0", SAME_KEY },
+ { "1", DIFF_KEY },
+ { "1", SAME_KEY },
+ { "1", SAME_KEY },
+ { "1", SAME_KEY },
+ { "1", SAME_KEY }
+
+ };
+
+ verify(records, expectedResult);
+ pathList.clear();
+ data.clear();
+ }
+
+ @Test(timeout = 5000)
+ public void testWithCustomComparator() throws Exception {
+ List<Path> pathList = new LinkedList<Path>();
+ List<String> data = Lists.newLinkedList();
+
+ LOG.info(
+ "Test with custom comparator 3 files with RLE (starting keys) spanning across boundaries");
+
+ //Test with 3 files
+ //First file
+ data.clear();
+ data.add("0");
+ pathList.add(createIFileWithTextData(data));
+
+ //Second file
+ data.clear();
+ data.add("0");
+ pathList.add(createIFileWithTextData(data));
+
+ //Third file
+ data.clear();
+ data.add("1");
+ pathList.add(createIFileWithTextData(data));
+
+ TezRawKeyValueIterator records = merge(pathList, new CustomComparator());
+ String[][] expectedResult =
+ {
+ //formatting intentionally
+ { "0", DIFF_KEY },
+ { "0", SAME_KEY },
+ { "1", DIFF_KEY }
+ };
+
+ verify(records, expectedResult);
+ pathList.clear();
+ data.clear();
+ }
+
+ @Test(timeout = 5000)
+ public void testWithCustomComparator_RLE3() throws Exception {
+ List<Path> pathList = new LinkedList<Path>();
+ List<String> data = Lists.newLinkedList();
+
+ LOG.info("Test with custom comparator");
+
+ //Test with 3 files, same keys in middle of file
+ //First file
+ data.clear();
+ data.add("0");
+ pathList.add(createIFileWithTextData(data));
+
+ //Second file
+ data.clear();
+ data.add("0");
+ data.add("1");
+ data.add("1");
+ pathList.add(createIFileWithTextData(data));
+
+ TezRawKeyValueIterator records = merge(pathList, new CustomComparator());
+
+ String[][] expectedResult =
+ {
+ //formatting intentionally
+ { "0", DIFF_KEY },
+ { "0", SAME_KEY },
+ { "1", DIFF_KEY },
+ { "1", SAME_KEY } };
+
+ verify(records, expectedResult);
+ pathList.clear();
+ data.clear();
+ }
+
+ @Test(timeout = 5000)
+ public void testWithCustomComparator_allEmptyFiles() throws Exception {
+ List<Path> pathList = new LinkedList<Path>();
+ List<String> data = Lists.newLinkedList();
+
+ LOG.info("Test with custom comparator where all files are empty");
+
+ //First file
+ pathList.add(createIFileWithTextData(data));
+
+ //Second file
+ pathList.add(createIFileWithTextData(data));
+
+ //Third file
+ pathList.add(createIFileWithTextData(data));
+
+ //Fourth file
+ pathList.add(createIFileWithTextData(data));
+
+ TezRawKeyValueIterator records = merge(pathList, new CustomComparator());
+
+ String[][] expectedResult = new String[0][0];
+
+ verify(records, expectedResult);
+ }
+
+ /**
+ * Merge the data sets
+ *
+ * @param pathList
+ * @param rc
+ * @return
+ * @throws IOException
+ */
+ private TezRawKeyValueIterator merge(List<Path> pathList, RawComparator rc) throws IOException {
+ TezMerger merger = new TezMerger();
+ TezRawKeyValueIterator records = merger.merge(defaultConf, localFs, IntWritable.class,
+ LongWritable.class, null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]),
+ true, 4, new Path(workDir, "tmp_" + System.nanoTime()), ((rc == null) ? comparator : rc),
+ new Reporter(), null, null,
+ null, new Progress());
+ return records;
+ }
+
+
+
+ //Sample comparator to test TEZ-1999 corner case
+ static class CustomComparator extends WritableComparator {
+ @Override
+ //Not a valid comparison, but just to check byte boundaries
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ Preconditions.checkArgument(l2 > 0 && l1 > 0, "l2=" + l2 + ",l1=" + l1);
+ ByteBuffer bb1 = ByteBuffer.wrap(b1, s1, l1);
+ ByteBuffer bb2 = ByteBuffer.wrap(b2, s2, l2);
+ return bb1.compareTo(bb2);
+ }
+ }
+
+ private void merge(List<Path> pathList, int mergeFactor) throws Exception {
+ merge(pathList, mergeFactor, null);
}
private void merge(int fileCount, int keysPerFile, int mergeFactor) throws Exception {
List<Path> pathList = createIFiles(fileCount, keysPerFile);
+ merge(pathList, mergeFactor, null);
+ }
+ private void merge(List<Path> pathList, int mergeFactor, RawComparator rc) throws Exception {
//Merge datasets
TezMerger merger = new TezMerger();
TezRawKeyValueIterator records = merger.merge(defaultConf, localFs, IntWritable.class,
LongWritable.class, null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]),
true, mergeFactor, new Path(workDir, "tmp_" + System.nanoTime()),
- ConfigUtils.getIntermediateInputKeyComparator(defaultConf), new Reporter(), null, null,
+ ((rc == null) ? comparator : rc), new Reporter(), null, null,
null,
new Progress());
@@ -134,9 +615,9 @@ public class TestTezMerger {
if (records.isSameKey()) {
LOG.info("\tSame Key : key=" + k.get() + ", val=" + v.get());
//More than one key should be present in the source data
- Assert.assertTrue(verificationDataSet.get(k.get()).size() > 1);
+ assertTrue(verificationDataSet.get(k.get()).size() > 1);
//Ensure this is same as the previous key we saw
- Assert.assertTrue(pk == k.get());
+ assertTrue("previousKey=" + pk + ", current=" + k.get(), pk == k.get());
} else {
LOG.info("key=" + k.get() + ", val=" + v.get());
}
@@ -147,30 +628,30 @@ public class TestTezMerger {
}
//Verify if the number of distinct entries is the same in source and the test
- Assert.assertTrue("dataMap=" + dataMap.keySet().size() + ", verificationSet=" +
- verificationDataSet.keySet().size(),
+ assertTrue("dataMap=" + dataMap.keySet().size() + ", verificationSet=" +
+ verificationDataSet.keySet().size(),
dataMap.keySet().size() == verificationDataSet.keySet().size());
//Verify with source data
for (Integer key : verificationDataSet.keySet()) {
- Assert.assertTrue("Data size for " + key + " not matching with source; dataSize:" + dataMap
+ assertTrue("Data size for " + key + " not matching with source; dataSize:" + dataMap
.get(key).intValue() + ", source:" + verificationDataSet.get(key).size(),
dataMap.get(key).intValue() == verificationDataSet.get(key).size());
}
//Verify if every key has the same number of repeated items in the source dataset as well
for (Map.Entry<Integer, Integer> entry : dataMap.entrySet()) {
- Assert.assertTrue(entry.getKey() + "", verificationDataSet.get(entry.getKey()).size() == entry
+ assertTrue(entry.getKey() + "", verificationDataSet.get(entry.getKey()).size() == entry
.getValue());
}
LOG.info("******************");
+ verificationDataSet.clear();
}
private List<Path> createIFiles(int fileCount, int keysPerFile)
throws IOException {
List<Path> pathList = Lists.newLinkedList();
- verificationDataSet.clear();
Random rnd = new Random();
for (int i = 0; i < fileCount; i++) {
int repeatCount = ((i % 2 == 0) && keysPerFile > 0) ? rnd.nextInt(keysPerFile) : 0;
@@ -180,8 +661,10 @@ public class TestTezMerger {
return pathList;
}
- static Path writeIFile(int keysPerFile, int repeatCount) throws IOException {
+ static Path writeIFile(int keysPerFile, int repeatCount) throws
+ IOException {
TreeMultimap<Integer, Long> dataSet = createDataForIFile(keysPerFile, repeatCount);
+ LOG.info("DataSet size : " + dataSet.size());
Path path = new Path(workDir + "/src", "data_" + System.nanoTime() + ".out");
FSDataOutputStream out = localFs.create(path);
//create IFile with RLE
@@ -202,7 +685,7 @@ public class TestTezMerger {
/**
* Generate data set for ifile. Create repeated keys if needed.
*
- * @param keyCount approximate number of keys to be created
+ * @param keyCount approximate number of keys to be created
* @param repeatCount number of times a key should be repeated
* @return
*/
@@ -210,9 +693,9 @@ public class TestTezMerger {
TreeMultimap<Integer, Long> dataSet = TreeMultimap.create();
Random rnd = new Random();
for (int i = 0; i < keyCount; i++) {
- if (repeatCount > 0 && (rnd.nextInt(keyCount) % 2 == 0)) {
+ if (repeatCount > 0 && (rnd.nextInt(keyCount) % 2 == 0)) {
//repeat this key
- for(int j = 0; j < repeatCount; j++) {
+ for (int j = 0; j < repeatCount; j++) {
IntWritable key = new IntWritable(rnd.nextInt(keyCount));
LongWritable value = new LongWritable(System.nanoTime());
dataSet.put(key.get(), value.get());
@@ -234,7 +717,6 @@ public class TestTezMerger {
return dataSet;
}
-
private static class Reporter implements Progressable {
@Override
public void progress() {
[6/8] tez git commit: TEZ-2023. Refactor logIndividualFetchComplete()
to be common for both shuffle-schedulers (rbalamohan)
Posted by ss...@apache.org.
TEZ-2023. Refactor logIndividualFetchComplete() to be common for both shuffle-schedulers (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5cf9105f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5cf9105f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5cf9105f
Branch: refs/heads/TEZ-2003
Commit: 5cf9105fe47bb07aa42f5b3132ba13e81fe205a8
Parents: 7096d8a
Author: Rajesh Balamohan <rb...@hortonworks.com>
Authored: Thu Feb 5 08:25:24 2015 +0530
Committer: Rajesh Balamohan <rb...@hortonworks.com>
Committed: Thu Feb 5 08:25:24 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../library/common/shuffle/HttpConnection.java | 1 +
.../library/common/shuffle/ShuffleUtils.java | 39 ++++++++++++++++++++
.../common/shuffle/impl/ShuffleManager.java | 19 +---------
.../orderedgrouped/ShuffleScheduler.java | 21 ++---------
5 files changed, 47 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 39d7f81..6a494ca 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2023. Refactor logIndividualFetchComplete() to be common for both shuffle-schedulers.
TEZ-1895. Vertex reRunning should decrease successfulMembers of VertexGroupInfo.
TEZ-1999. IndexOutOfBoundsException during merge.
TEZ-2000. Source vertex exists error during DAG submission.
http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
index 4732a5a..1a5de41 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
@@ -237,6 +237,7 @@ public class HttpConnection {
}
// verify that replyHash is HMac of encHash
SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecretMgr);
+ //Following log statement will be used by tez-tool perf-analyzer for mapping attempt to NM host
LOG.info("for url=" + url +
" sent hash and receievd reply " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 629bab8..af02f9e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -24,6 +24,7 @@ import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
+import java.text.DecimalFormat;
import java.util.List;
import javax.crypto.SecretKey;
@@ -50,6 +51,14 @@ public class ShuffleUtils {
private static final Log LOG = LogFactory.getLog(ShuffleUtils.class);
public static final String SHUFFLE_HANDLER_SERVICE_ID = "mapreduce_shuffle";
+ static final ThreadLocal<DecimalFormat> MBPS_FORMAT =
+ new ThreadLocal<DecimalFormat>() {
+ @Override
+ protected DecimalFormat initialValue() {
+ return new DecimalFormat("0.00");
+ }
+ };
+
public static SecretKey getJobTokenSecretFromTokenBytes(ByteBuffer meta)
throws IOException {
DataInputByteBuffer in = new DataInputByteBuffer();
@@ -233,5 +242,35 @@ public class ShuffleUtils {
sb.append("]");
return sb.toString();
}
+
+ /**
+ * Log individual fetch complete event.
+ * This log information would be used by tez-tool/perf-analzyer/shuffle tools for mining
+ * - amount of data transferred between source to destination machine
+ * - time taken to transfer data between source to destination machine
+ * - details on DISK/DISK_DIRECT/MEMORY based shuffles
+ *
+ * @param log
+ * @param millis
+ * @param bytesCompressed
+ * @param bytesDecompressed
+ * @param outputType
+ * @param srcAttemptIdentifier
+ */
+ public static void logIndividualFetchComplete(Log log, long millis, long
+ bytesCompressed,
+ long bytesDecompressed, String outputType, InputAttemptIdentifier srcAttemptIdentifier) {
+ double rate = 0;
+ if (millis != 0) {
+ rate = bytesCompressed / ((double) millis / 1000);
+ rate = rate / (1024 * 1024);
+ }
+ log.info(
+ "Completed fetch for attempt: "
+ + srcAttemptIdentifier + " to " + outputType +
+ ", CompressedSize=" + bytesCompressed + ", DecompressedSize=" + bytesDecompressed +
+ ", EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
+ MBPS_FORMAT.get().format(rate) + " MB/s");
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 13296c7..3dc8156 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -519,8 +519,8 @@ public class ShuffleManager implements FetcherCallback {
if (!completedInputSet.contains(inputIdentifier)) {
fetchedInput.commit();
committed = true;
- logIndividualFetchComplete(copyDuration, fetchedBytes, decompressedLength, fetchedInput,
- srcAttemptIdentifier);
+ ShuffleUtils.logIndividualFetchComplete(LOG, copyDuration,
+ fetchedBytes, decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier);
// Processing counters for completed and commit fetches only. Need
// additional counters for excessive fetches - which primarily comes
@@ -731,22 +731,7 @@ public class ShuffleManager implements FetcherCallback {
+ mbpsFormat.format(transferRate) + " MB/s)");
}
- private void logIndividualFetchComplete(long millis, long fetchedBytes, long decompressedLength,
- FetchedInput fetchedInput,
- InputAttemptIdentifier srcAttemptIdentifier) {
- double rate = 0;
- if (millis != 0) {
- rate = fetchedBytes / ((double) millis / 1000);
- rate = rate / (1024 * 1024);
- }
- LOG.info(
- "Completed fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType() +
- ", CompressedSize=" + fetchedBytes + ", DecompressedSize=" + decompressedLength +
- ",EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
- mbpsFormat.format(rate) + " MB/s");
- }
-
private class SchedulerFutureCallback implements FutureCallback<Void> {
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 066b94a..57e904b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -47,6 +47,7 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type;
import com.google.common.collect.Lists;
@@ -103,7 +104,7 @@ class ShuffleScheduler {
private long totalBytesShuffledTillNow = 0;
private DecimalFormat mbpsFormat = new DecimalFormat("0.00");
-
+
public ShuffleScheduler(InputContext inputContext,
Configuration conf,
int numberOfInputs,
@@ -187,8 +188,8 @@ class ShuffleScheduler {
}
output.commit();
- logIndividualFetchComplete(millis, bytesCompressed, bytesDecompressed, output,
- srcAttemptIdentifier);
+ ShuffleUtils.logIndividualFetchComplete(LOG, millis, bytesCompressed,
+ bytesDecompressed, output.getType().toString(), srcAttemptIdentifier);
if (output.getType() == Type.DISK) {
bytesShuffledToDisk.increment(bytesCompressed);
} else if (output.getType() == Type.DISK_DIRECT) {
@@ -234,20 +235,6 @@ class ShuffleScheduler {
// TODO NEWTEZ Should this be releasing the output, if not committed ? Possible memory leak in case of speculation.
}
- private void logIndividualFetchComplete(long millis, long bytesCompressed, long bytesDecompressed,
- MapOutput output,
- InputAttemptIdentifier srcAttemptIdentifier) {
- double rate = 0;
- if (millis != 0) {
- rate = bytesCompressed / ((double) millis / 1000);
- rate = rate / (1024 * 1024);
- }
- LOG.info(
- "Completed fetch for attempt: " + srcAttemptIdentifier + " to " + output.getType() +
- ", CompressedSize=" + bytesCompressed + ", DecompressedSize=" + bytesDecompressed +
- ",EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
- mbpsFormat.format(rate) + " MB/s");
- }
private void logProgress() {
double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);
[7/8] tez git commit: TEZ-2020. For 1-1 edge vertex configured event
may be sent incorrectly (bikas)
Posted by ss...@apache.org.
TEZ-2020. For 1-1 edge vertex configured event may be sent incorrectly (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b804b8f0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b804b8f0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b804b8f0
Branch: refs/heads/TEZ-2003
Commit: b804b8f03e9d7934202f3838841fe1abb416f480
Parents: 5cf9105
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Feb 4 19:48:19 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Feb 4 19:48:41 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../event/VertexEventOneToOneSourceSplit.java | 50 ---------
.../tez/dag/app/dag/event/VertexEventType.java | 1 -
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 111 +------------------
.../tez/dag/app/dag/impl/TestVertexImpl.java | 39 +++----
.../vertexmanager/InputReadyVertexManager.java | 80 ++++++++-----
.../TestInputReadyVertexManager.java | 64 ++++++-----
7 files changed, 108 insertions(+), 238 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/b804b8f0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6a494ca..2c54b4b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -176,6 +176,7 @@ TEZ-UI CHANGES (TEZ-8):
Release 0.5.4: Unreleased
ALL CHANGES:
+ TEZ-2020. For 1-1 edge vertex configured event may be sent incorrectly
TEZ-2015. VertexImpl.doneReconfiguringVertex() should check other criteria
before sending notification
TEZ-2011. InputReadyVertexManager not resilient to updates in parallelism
http://git-wip-us.apache.org/repos/asf/tez/blob/b804b8f0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java
deleted file mode 100644
index a7e580e..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.dag.event;
-
-import org.apache.tez.dag.records.TezVertexID;
-
-public class VertexEventOneToOneSourceSplit extends VertexEvent {
- final int numTasks;
- final TezVertexID originalSplitVertex;
- final TezVertexID senderVertex;
-
- public VertexEventOneToOneSourceSplit(TezVertexID vertexId,
- TezVertexID senderVertex,
- TezVertexID originalSplitVertex,
- int numTasks) {
- super(vertexId, VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT);
- this.numTasks = numTasks;
- this.senderVertex = senderVertex;
- this.originalSplitVertex = originalSplitVertex;
- }
-
- public int getNumTasks() {
- return numTasks;
- }
-
- public TezVertexID getOriginalSplitSource() {
- return originalSplitVertex;
- }
-
- public TezVertexID getSenderVertex() {
- return senderVertex;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/b804b8f0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index 1d0222e..5eb4929 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -45,7 +45,6 @@ public enum VertexEventType {
V_MANAGER_USER_CODE_ERROR,
V_ROUTE_EVENT,
- V_ONE_TO_ONE_SOURCE_SPLIT,
//Producer: VertexInputInitializer
V_ROOT_INPUT_INITIALIZED,
http://git-wip-us.apache.org/repos/asf/tez/blob/b804b8f0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 9deccd2..865b182 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -116,7 +116,6 @@ import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
import org.apache.tez.dag.app.dag.event.VertexEventNullEdgeInitialized;
-import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
@@ -343,10 +342,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_ROOT_INPUT_INITIALIZED,
new RootInputInitializedTransition())
.addTransition(VertexState.INITIALIZING,
- EnumSet.of(VertexState.INITIALIZING),
- VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
- new OneToOneSourceSplitTransition())
- .addTransition(VertexState.INITIALIZING,
EnumSet.of(VertexState.INITED, VertexState.FAILED),
VertexEventType.V_READY_TO_INIT,
new VertexInitializedTransition())
@@ -400,10 +395,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
new SourceVertexStartedTransition())
.addTransition(VertexState.INITED,
EnumSet.of(VertexState.INITED),
- VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
- new OneToOneSourceSplitTransition())
- .addTransition(VertexState.INITED,
- EnumSet.of(VertexState.INITED),
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
.addTransition(VertexState.INITED,
@@ -443,10 +434,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexState.ERROR),
VertexEventType.V_TASK_COMPLETED,
new TaskCompletedTransition())
- .addTransition(VertexState.RUNNING,
- EnumSet.of(VertexState.RUNNING),
- VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
- new OneToOneSourceSplitTransition())
.addTransition(VertexState.RUNNING, VertexState.TERMINATING,
VertexEventType.V_TERMINATE,
new VertexKilledTransition())
@@ -546,7 +533,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_ROUTE_EVENT,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_COMPLETED,
- VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
VertexEventType.V_ROOT_INPUT_INITIALIZED,
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_NULL_EDGE_INITIALIZED,
@@ -569,7 +555,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_ROUTE_EVENT,
VertexEventType.V_TASK_RESCHEDULED,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
- VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_COMPLETED,
VertexEventType.V_ROOT_INPUT_INITIALIZED,
@@ -590,7 +575,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_MANAGER_USER_CODE_ERROR,
VertexEventType.V_TASK_COMPLETED,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
- VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_RESCHEDULED,
VertexEventType.V_INTERNAL_ERROR,
@@ -692,7 +676,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private final UserGroupInformation dagUgi;
private boolean parallelismSet = false;
- private TezVertexID originalOneToOneSplitSource = null;
private AtomicBoolean committed = new AtomicBoolean(false);
private AtomicBoolean aborted = new AtomicBoolean(false);
@@ -1497,21 +1480,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
- for (Map.Entry<Vertex, Edge> entry : targetVertices.entrySet()) {
- Edge edge = entry.getValue();
- if (edge.getEdgeProperty().getDataMovementType()
- == DataMovementType.ONE_TO_ONE) {
- // inform these target vertices that we have changed parallelism
- VertexEventOneToOneSourceSplit event =
- new VertexEventOneToOneSourceSplit(entry.getKey().getVertexId(),
- getVertexId(),
- ((originalOneToOneSplitSource!=null) ?
- originalOneToOneSplitSource : getVertexId()),
- numTasks);
- getEventHandler().handle(event);
- }
- }
-
} finally {
writeLock.unlock();
}
@@ -1532,21 +1500,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
@Override
public void vertexReconfigurationPlanned() {
- vertexReconfigurationPlanned(false);
- }
-
- public void vertexReconfigurationPlanned(boolean testOverride) {
writeLock.lock();
try {
- if (testOverride) {
- Preconditions.checkState(vmIsInitialized.get() && completelyConfiguredSent.get(),
- "test should override only failed cases");
- } else {
- Preconditions.checkState(!vmIsInitialized.get(),
- "context.vertexReconfigurationPlanned() cannot be called after initialize()");
- Preconditions.checkState(!completelyConfiguredSent.get(), "vertexReconfigurationPlanned() "
- + " cannot be invoked after the vertex has been configured.");
- }
+ Preconditions.checkState(!vmIsInitialized.get(),
+ "context.vertexReconfigurationPlanned() cannot be called after initialize()");
+ Preconditions.checkState(!completelyConfiguredSent.get(), "vertexReconfigurationPlanned() "
+ + " cannot be invoked after the vertex has been configured.");
this.vertexToBeReconfiguredByManager = true;
} finally {
writeLock.unlock();
@@ -3147,68 +3106,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
- public static class OneToOneSourceSplitTransition implements
- MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
-
- @Override
- public VertexState transition(VertexImpl vertex, VertexEvent event) {
- VertexEventOneToOneSourceSplit splitEvent =
- (VertexEventOneToOneSourceSplit)event;
- TezVertexID originalSplitSource = splitEvent.getOriginalSplitSource();
-
- if (vertex.originalOneToOneSplitSource != null) {
- VertexState state = vertex.getState();
- Preconditions
- .checkState(
- (state == VertexState.INITIALIZING
- || state == VertexState.INITED || state == VertexState.RUNNING),
- " Unexpected 1-1 split for vertex " + vertex.getLogIdentifier()
- + " in state " + vertex.getState() + " . Split in vertex "
- + originalSplitSource + " sent by vertex "
- + splitEvent.getSenderVertex() + " numTasks "
- + splitEvent.getNumTasks());
- if (vertex.originalOneToOneSplitSource.equals(originalSplitSource)) {
- // ignore another split event that may have come from a different
- // path in the DAG. We have already split because of that source
- LOG.info("Ignoring split of vertex " + vertex.getLogIdentifier() +
- " because of split in vertex " + originalSplitSource +
- " sent by vertex " + splitEvent.getSenderVertex() +
- " numTasks " + splitEvent.getNumTasks());
- return state;
- }
- // cannot split from multiple sources
- throw new TezUncheckedException("Vertex: " + vertex.getLogIdentifier() +
- " asked to split by: " + originalSplitSource +
- " but was already split by:" + vertex.originalOneToOneSplitSource);
- }
-
- LOG.info("Splitting vertex " + vertex.getLogIdentifier() +
- " because of split in vertex " + originalSplitSource +
- " sent by vertex " + splitEvent.getSenderVertex() +
- " numTasks " + splitEvent.getNumTasks());
- vertex.originalOneToOneSplitSource = originalSplitSource;
- try {
- vertex.setParallelism(splitEvent.getNumTasks(), null, null, null, false);
- } catch (Exception e) {
- // ingore this exception, should not happen
- LOG.error("Unexpected exception, Just set Parallelims to a specified value, not involve EdgeManager,"
- + "exception should not happen here", e);
- }
- if (vertex.getState() == VertexState.RUNNING ||
- vertex.getState() == VertexState.INITED) {
- return vertex.getState();
- } else {
- Preconditions.checkState(vertex.getState() == VertexState.INITIALIZING,
- " Unexpected 1-1 split for vertex " + vertex.getLogIdentifier() +
- " in state " + vertex.getState() +
- " . Split in vertex " + originalSplitSource +
- " sent by vertex " + splitEvent.getSenderVertex() +
- " numTasks " + splitEvent.getNumTasks());
- return vertex.getState();
- }
- }
- }
-
// Temporary to maintain topological order while starting vertices. Not useful
// since there's not much difference between the INIT and RUNNING states.
public static class SourceVertexStartedTransition implements
http://git-wip-us.apache.org/repos/asf/tez/blob/b804b8f0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index e94bb17..83a3a8a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -2447,14 +2447,14 @@ public class TestVertexImpl {
@Test(timeout = 5000)
public void testVertexSetParallelism() throws Exception {
- initAllVertices(VertexState.INITED);
VertexImpl v3 = vertices.get("vertex3");
+ v3.vertexReconfigurationPlanned();
+ initAllVertices(VertexState.INITED);
Assert.assertEquals(2, v3.getTotalTasks());
Map<TezTaskID, Task> tasks = v3.getTasks();
Assert.assertEquals(2, tasks.size());
TezTaskID firstTask = tasks.keySet().iterator().next();
- v3.vertexReconfigurationPlanned(true);
VertexImpl v1 = vertices.get("vertex1");
startVertex(vertices.get("vertex2"));
startVertex(v1);
@@ -2477,13 +2477,13 @@ public class TestVertexImpl {
@Test(timeout = 5000)
public void testVertexSetParallelismIncreaseException() throws Exception {
- initAllVertices(VertexState.INITED);
VertexImpl v3 = vertices.get("vertex3");
+ v3.vertexReconfigurationPlanned();
+ initAllVertices(VertexState.INITED);
Assert.assertEquals(2, v3.getTotalTasks());
Map<TezTaskID, Task> tasks = v3.getTasks();
Assert.assertEquals(2, tasks.size());
- v3.vertexReconfigurationPlanned(true);
VertexImpl v1 = vertices.get("vertex1");
startVertex(vertices.get("vertex2"));
startVertex(v1);
@@ -2500,13 +2500,13 @@ public class TestVertexImpl {
@Test(timeout = 5000)
public void testVertexSetParallelismMultipleException() throws Exception {
- initAllVertices(VertexState.INITED);
VertexImpl v3 = vertices.get("vertex3");
+ v3.vertexReconfigurationPlanned();
+ initAllVertices(VertexState.INITED);
Assert.assertEquals(2, v3.getTotalTasks());
Map<TezTaskID, Task> tasks = v3.getTasks();
Assert.assertEquals(2, tasks.size());
- v3.vertexReconfigurationPlanned(true);
VertexImpl v1 = vertices.get("vertex1");
startVertex(vertices.get("vertex2"));
startVertex(v1);
@@ -2561,6 +2561,8 @@ public class TestVertexImpl {
@Test(timeout = 5000)
public void testSetCustomEdgeManager() throws Exception {
+ VertexImpl v5 = vertices.get("vertex5"); // Vertex5 linked to v3 (v3 src, v5 dest)
+ v5.vertexReconfigurationPlanned();
initAllVertices(VertexState.INITED);
Edge edge = edges.get("e4");
EdgeManagerPlugin em = edge.getEdgeManager();
@@ -2574,10 +2576,7 @@ public class TestVertexImpl {
edgeManagerDescriptor.setUserPayload(userPayload);
Vertex v3 = vertices.get("vertex3");
- VertexImpl v5 = vertices.get("vertex5"); // Vertex5 linked to v3 (v3 src, v5
- // dest)
- v5.vertexReconfigurationPlanned(true);
Map<String, EdgeManagerPluginDescriptor> edgeManagerDescriptors =
Collections.singletonMap(v3.getName(), edgeManagerDescriptor);
v5.setParallelism(v5.getTotalTasks() - 1, null, edgeManagerDescriptors, null, true); // Must decrease.
@@ -3396,25 +3395,23 @@ public class TestVertexImpl {
Assert.assertEquals(v1Hints.get(i), v1.getTaskLocationHints()[i]);
}
Assert.assertEquals(true, initializerManager1.hasShutDown);
-
+
+ startVertex(v1);
+
Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks());
- Assert.assertEquals(VertexState.INITED, vertices.get("vertex2").getState());
Assert.assertEquals(numTasks, vertices.get("vertex3").getTotalTasks());
- Assert.assertEquals(VertexState.INITED, vertices.get("vertex3").getState());
Assert.assertEquals(numTasks, vertices.get("vertex4").getTotalTasks());
- Assert.assertEquals(VertexState.INITED, vertices.get("vertex5").getState());
- // v5, v6 still initializing since edge is null
- Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex4").getState());
+ // v4, v6 still initializing since edge is null
Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex4").getState());
+ Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex6").getState());
- startVertex(v1);
Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex1").getState());
Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex2").getState());
Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex3").getState());
Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex5").getState());
- // v5, v6 still initializing since edge is null
- Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex4").getState());
+ // v4, v6 still initializing since edge is null
Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex4").getState());
+ Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex6").getState());
mockEdgeManagerDescriptor =
EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName());
@@ -3423,7 +3420,7 @@ public class TestVertexImpl {
e.setCustomEdgeManager(mockEdgeManagerDescriptor);
dispatcher.await();
Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
- Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
+ Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex6").getState());
}
@Test(timeout = 5000)
@@ -3434,6 +3431,7 @@ public class TestVertexImpl {
dagPlan = createDAGPlanForOneToOneSplit(null, numTasks, false);
setupPostDagCreation();
VertexImpl v1 = vertices.get("vertex1");
+ v1.vertexReconfigurationPlanned();
initAllVertices(VertexState.INITED);
// fudge vertex manager so that tasks dont start running
@@ -3441,7 +3439,6 @@ public class TestVertexImpl {
VertexManagerPluginDescriptor.create(VertexManagerPluginForTest.class.getName()),
v1, appContext, mock(StateChangeNotifier.class));
v1.vertexManager.initialize();
- v1.vertexReconfigurationPlanned(true);
startVertex(v1);
Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks());
@@ -3473,6 +3470,7 @@ public class TestVertexImpl {
dagPlan = createDAGPlanForOneToOneSplit(null, numTasks, false);
setupPostDagCreation();
VertexImpl v1 = vertices.get("vertex1");
+ v1.vertexReconfigurationPlanned();
initAllVertices(VertexState.INITED);
// fudge vertex manager so that tasks dont start running
@@ -3486,7 +3484,6 @@ public class TestVertexImpl {
Assert.assertEquals(numTasks, vertices.get("vertex4").getTotalTasks());
// change parallelism
int newNumTasks = 3;
- v1.vertexReconfigurationPlanned(true);
v1.setParallelism(newNumTasks, null, null, null, true);
v1.doneReconfiguringVertex();
dispatcher.await();
http://git-wip-us.apache.org/repos/asf/tez/blob/b804b8f0/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
index f5c187e..e2e9dd3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -55,8 +56,10 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
int oneToOneSrcTasksDoneCount[];
TaskLocationHint oneToOneLocationHints[];
int numOneToOneEdges;
- int numSignalsToWaitFor;
+ int numConfiguredSources;
Multimap<String, Integer> pendingCompletions = LinkedListMultimap.create();
+ AtomicBoolean configured;
+ AtomicBoolean started;
public InputReadyVertexManager(VertexManagerPluginContext context) {
super(context);
@@ -76,13 +79,10 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
}
}
- void start() {
- if (!ready()) {
- return;
- }
+ private void configure() {
+ Preconditions.checkState(!configured.get(), "Vertex: " + getContext().getVertexName());
int numManagedTasks = getContext().getVertexNumTasks(getContext().getVertexName());
LOG.info("Managing " + numManagedTasks + " tasks for vertex: " + getContext().getVertexName());
- taskIsStarted = new boolean[numManagedTasks];
// find out about all input edge types. If there is a custom edge then
// TODO Until TEZ-1013 we cannot handle custom input formats
@@ -116,32 +116,51 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
}
if (numOneToOneEdges > 0) {
+ Preconditions
+ .checkState(oneToOneSrcTaskCount >= 0, "Vertex: " + getContext().getVertexName());
if (oneToOneSrcTaskCount != numManagedTasks) {
- throw new TezUncheckedException(
- "Managed task number must equal 1-1 source task number");
+ numManagedTasks = oneToOneSrcTaskCount;
+ // must change parallelism to make them the same
+ LOG.info("Update parallelism of vertex: " + getContext().getVertexName() +
+ " to " + oneToOneSrcTaskCount + " to match source 1-1 vertices.");
+ getContext().setVertexParallelism(oneToOneSrcTaskCount, null, null, null);
}
oneToOneSrcTasksDoneCount = new int[oneToOneSrcTaskCount];
oneToOneLocationHints = new TaskLocationHint[oneToOneSrcTaskCount];
}
+
+ Preconditions.checkState(numManagedTasks >=0, "Vertex: " + getContext().getVertexName());
+ taskIsStarted = new boolean[numManagedTasks];
- for (Map.Entry<String, Collection<Integer>> entry : pendingCompletions.asMap().entrySet()) {
- for (Integer task : entry.getValue()) {
- handleSourceTaskFinished(entry.getKey(), task);
- }
- }
+ // allow scheduling
+ configured.set(true);
+ getContext().doneReconfiguringVertex();
+ trySchedulingPendingCompletions();
}
- boolean ready() {
- int target = getContext().getInputVertexEdgeProperties().size() + 1;
- Preconditions.checkState(numSignalsToWaitFor <= target);
- return (numSignalsToWaitFor == target);
+ private boolean readyToSchedule() {
+ return (configured.get() && started.get());
+ }
+
+ private void trySchedulingPendingCompletions() {
+ if (readyToSchedule() && !pendingCompletions.isEmpty()) {
+ for (Map.Entry<String, Collection<Integer>> entry : pendingCompletions.asMap().entrySet()) {
+ for (Integer i : entry.getValue()) {
+ onSourceTaskCompleted(entry.getKey(), i);
+ }
+ }
+ }
}
@Override
public void initialize() {
+ // this will prevent vertex from starting until we notify we are done
+ getContext().vertexReconfigurationPlanned();
Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
// wait for sources and self to start
- numSignalsToWaitFor = 0;
+ numConfiguredSources = 0;
+ configured = new AtomicBoolean(false);
+ started = new AtomicBoolean(false);
for (String entry : edges.keySet()) {
getContext().registerForVertexStateUpdates(entry, EnumSet.of(VertexState.CONFIGURED));
}
@@ -149,24 +168,33 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
@Override
public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception {
- numSignalsToWaitFor++;
- LOG.info("Received configured signal from: " + stateUpdate.getVertexName() +
- " numConfiguredSources: " + numSignalsToWaitFor);
- start();
+ numConfiguredSources++;
+ int target = getContext().getInputVertexEdgeProperties().size();
+ LOG.info("For vertex: " + getContext().getVertexName() + "Received configured signal from: "
+ + stateUpdate.getVertexName() + " numConfiguredSources: " + numConfiguredSources
+ + " needed: " + target);
+ Preconditions.checkState(numConfiguredSources <= target, "Vertex: " + getContext().getVertexName());
+ if (numConfiguredSources == target) {
+ configure();
+ }
}
@Override
public synchronized void onVertexStarted(Map<String, List<Integer>> completions) {
for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
pendingCompletions.putAll(entry.getKey(), entry.getValue());
- }
- numSignalsToWaitFor++;
- start();
+ }
+
+ // allow scheduling
+ started.set(true);
+
+ trySchedulingPendingCompletions();
}
@Override
public synchronized void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
- if (ready()) {
+ if (readyToSchedule()) {
+ // configured and started. try to schedule
handleSourceTaskFinished(srcVertexName, taskId);
} else {
pendingCompletions.put(srcVertexName, taskId);
http://git-wip-us.apache.org/repos/asf/tez/blob/b804b8f0/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
index 9a83a51..8de747d 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
@@ -31,6 +31,7 @@ import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
@@ -82,8 +83,11 @@ public class TestInputReadyVertexManager {
InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
manager.initialize();
- // first source vertex configured
+ verify(mockContext, times(1)).vertexReconfigurationPlanned();
+ // source vertex configured
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ verify(mockContext, times(1)).doneReconfiguringVertex();
+ verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
// then own vertex started
manager.onVertexStarted(initialCompletions);
manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
@@ -119,10 +123,12 @@ public class TestInputReadyVertexManager {
InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
manager.initialize();
- // first own vertex started
- manager.onVertexStarted(initialCompletions);
- // then source vertex configured
+ verify(mockContext, times(1)).vertexReconfigurationPlanned();
+ // source vertex configured
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ verify(mockContext, times(1)).doneReconfiguringVertex();
+ verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
+ manager.onVertexStarted(initialCompletions);
verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
Assert.assertEquals(1, requestCaptor.getValue().size());
Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex().intValue());
@@ -174,17 +180,19 @@ public class TestInputReadyVertexManager {
InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
manager.initialize();
- // first own vertex started
- manager.onVertexStarted(initialCompletions);
+ verify(mockContext, times(1)).vertexReconfigurationPlanned();
verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
+ // ok to have source task complete before anything else
manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
+ // first own vertex started
+ manager.onVertexStarted(initialCompletions);
+ // no scheduling as we are not configured yet
verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
// then source vertex configured. now we start
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ verify(mockContext, times(1)).doneReconfiguringVertex();
+
verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
- Assert.assertEquals(2, requestCaptor.getAllValues().size());
- Assert.assertEquals(1, requestCaptor.getValue().size());
- Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex().intValue());
manager.onSourceTaskCompleted(mockSrcVertexId1, 2);
verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
Assert.assertEquals(1, requestCaptor.getValue().size());
@@ -247,58 +255,48 @@ public class TestInputReadyVertexManager {
Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
- // 1-1 sources do not match managed tasks before vertex started
+ // 1-1 sources do not match managed tasks. setParallelism called to make them match
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
manager.initialize();
+ verify(mockContext, times(1)).vertexReconfigurationPlanned();
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
- try {
- manager.onVertexStarted(initialCompletions);
- Assert.assertTrue("Should have exception", false);
- } catch (TezUncheckedException e) {
- e.getMessage().contains("Managed task number must equal 1-1 source");
- }
-
- // 1-1 sources do not match managed tasks after vertex started
- when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
- manager = new InputReadyVertexManager(mockContext);
- manager.initialize();
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+ verify(mockContext, times(1)).setVertexParallelism(3, null, null, null);
+ verify(mockContext, times(1)).doneReconfiguringVertex();
manager.onVertexStarted(initialCompletions);
- when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
- try {
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
- Assert.assertTrue("Should have exception", false);
- } catch (TezUncheckedException e) {
- e.getMessage().contains("Managed task number must equal 1-1 source");
- }
// 1-1 sources do not match
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(4);
manager = new InputReadyVertexManager(mockContext);
manager.initialize();
+ verify(mockContext, times(2)).vertexReconfigurationPlanned();
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
try {
- manager.onVertexStarted(initialCompletions);
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
Assert.assertTrue("Should have exception", false);
} catch (TezUncheckedException e) {
e.getMessage().contains("1-1 source vertices must have identical concurrency");
}
+ verify(mockContext, times(1)).setVertexParallelism(anyInt(), (VertexLocationHint) any(),
+ anyMap(), anyMap()); // not invoked
+
+ when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3);
initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
initialCompletions.put(mockSrcVertexId2, Collections.singletonList(0));
- when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3);
manager = new InputReadyVertexManager(mockContext);
manager.initialize();
+ verify(mockContext, times(3)).vertexReconfigurationPlanned();
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+ verify(mockContext, times(1)).setVertexParallelism(anyInt(), (VertexLocationHint) any(),
+ anyMap(), anyMap()); // not invoked
+ verify(mockContext, times(2)).doneReconfiguringVertex();
manager.onVertexStarted(initialCompletions);
// all 1-1 0's done but not scheduled because v1 is not done
manager.onSourceTaskCompleted(mockSrcVertexId3, 0);
[8/8] tez git commit: TEZ-2019. Temporarily allow the scheduler and
launcher to be specified via configuration. (sseth)
Posted by ss...@apache.org.
TEZ-2019. Temporarily allow the scheduler and launcher to be specified
via configuration. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f49c054c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f49c054c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f49c054c
Branch: refs/heads/TEZ-2003
Commit: f49c054c95e0d3ff6d72e681da526aa95b64b300
Parents: b804b8f
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Jan 30 16:02:32 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Feb 5 12:59:53 2015 -0800
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 4 +++
.../apache/tez/dag/api/TezConfiguration.java | 2 ++
.../org/apache/tez/dag/app/DAGAppMaster.java | 30 ++++++++++++++++-
.../dag/app/rm/TaskSchedulerEventHandler.java | 35 ++++++++++++++++++--
.../org/apache/tez/runtime/task/TezChild.java | 3 +-
5 files changed, 70 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/f49c054c/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
new file mode 100644
index 0000000..1822fcb
--- /dev/null
+++ b/TEZ-2003-CHANGES.txt
@@ -0,0 +1,4 @@
+ALL CHANGES:
+ TEZ-2019. Temporarily allow the scheduler and launcher to be specified via configuration.
+
+INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/f49c054c/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 43307cb..d85e19a 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -972,4 +972,6 @@ public class TezConfiguration extends Configuration {
+ "allow.disabled.timeline-domains";
public static final boolean TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT = false;
+ public static final String TEZ_AM_CONTAINER_LAUNCHER_CLASS = TEZ_AM_PREFIX + "container-launcher.class";
+ public static final String TEZ_AM_TASK_SCHEDULER_CLASS = TEZ_AM_PREFIX + "task-scheduler.class";
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f49c054c/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index e1ab3b7..ac0e960 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -27,6 +27,8 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
@@ -89,6 +91,7 @@ import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.AsyncDispatcher;
+import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezConverterUtils;
import org.apache.tez.common.TezUtilsInternal;
@@ -917,9 +920,34 @@ public class DAGAppMaster extends AbstractService {
protected ContainerLauncher
createContainerLauncher(final AppContext context) throws UnknownHostException {
if(isLocal){
+ LOG.info("Creating LocalContainerLauncher");
return new LocalContainerLauncher(context, taskAttemptListener, workingDirectory);
} else {
- return new ContainerLauncherImpl(context);
+ // TODO: Temporary reflection with specific parameters until a clean interface is defined.
+ String containerLauncherClassName = getConfig().get(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHER_CLASS);
+ if (containerLauncherClassName == null) {
+ LOG.info("Creating Default Container Launcher");
+ return new ContainerLauncherImpl(context);
+ } else {
+ LOG.info("Creating container launcher : " + containerLauncherClassName);
+ Class<? extends ContainerLauncher> containerLauncherClazz = (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz(
+ containerLauncherClassName);
+ try {
+ Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
+ .getConstructor(AppContext.class, Configuration.class, TaskAttemptListener.class);
+ ctor.setAccessible(true);
+ ContainerLauncher instance = ctor.newInstance(context, getConfig(), taskAttemptListener);
+ return instance;
+ } catch (NoSuchMethodException e) {
+ throw new TezUncheckedException(e);
+ } catch (InvocationTargetException e) {
+ throw new TezUncheckedException(e);
+ } catch (InstantiationException e) {
+ throw new TezUncheckedException(e);
+ } catch (IllegalAccessException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f49c054c/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 625b09e..03037fa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -18,6 +18,8 @@
package org.apache.tez.dag.app.rm;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
@@ -28,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
@@ -40,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.TaskLocationHint;
@@ -315,12 +319,39 @@ public class TaskSchedulerEventHandler extends AbstractService
boolean isLocal = getConfig().getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
if (isLocal) {
+ LOG.info("Using TaskScheduler: LocalTaskSchedulerService");
return new LocalTaskSchedulerService(this, this.containerSignatureMatcher,
host, port, trackingUrl, appContext);
}
else {
- return new YarnTaskSchedulerService(this, this.containerSignatureMatcher,
- host, port, trackingUrl, appContext);
+ String schedulerClassName = getConfig().get(TezConfiguration.TEZ_AM_TASK_SCHEDULER_CLASS);
+ if (schedulerClassName == null) {
+ LOG.info("Using TaskScheduler: YarnTaskSchedulerService");
+ return new YarnTaskSchedulerService(this, this.containerSignatureMatcher,
+ host, port, trackingUrl, appContext);
+ } else {
+ LOG.info("Using custom TaskScheduler: " + schedulerClassName);
+ // TODO Temporary reflection with specific parameters. Remove once there is a clean interface.
+ Class<? extends TaskSchedulerService> taskSchedulerClazz =
+ (Class<? extends TaskSchedulerService>) ReflectionUtils.getClazz(schedulerClassName);
+ try {
+ Constructor<? extends TaskSchedulerService> ctor = taskSchedulerClazz
+ .getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class,
+ Integer.class, String.class, Configuration.class);
+ ctor.setAccessible(true);
+ TaskSchedulerService taskSchedulerService =
+ ctor.newInstance(this, appContext, host, port, trackingUrl, getConfig());
+ return taskSchedulerService;
+ } catch (NoSuchMethodException e) {
+ throw new TezUncheckedException(e);
+ } catch (InvocationTargetException e) {
+ throw new TezUncheckedException(e);
+ } catch (InstantiationException e) {
+ throw new TezUncheckedException(e);
+ } catch (IllegalAccessException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f49c054c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index d537846..6164e52 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -356,7 +356,8 @@ public class TezChild {
DefaultMetricsSystem.shutdown();
if (!isLocal) {
RPC.stopProxy(umbilical);
- LogManager.shutdown();
+ // TODO Temporary change. Revert. Ideally, move this over to the main method in TezChild if possible.
+// LogManager.shutdown();
}
}
}
[5/8] tez git commit: TEZ-2015. VertexImpl.doneReconfiguringVertex()
should check other criteria before sending notification (bikas)
Posted by ss...@apache.org.
TEZ-2015. VertexImpl.doneReconfiguringVertex() should check other criteria before sending notification (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7096d8a8
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7096d8a8
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7096d8a8
Branch: refs/heads/TEZ-2003
Commit: 7096d8a8f409e3db4f568230a959432e2c9cbb78
Parents: cfa637a
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Feb 4 11:10:23 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Feb 4 11:10:23 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 12 ++--
.../tez/dag/app/dag/impl/TestVertexImpl.java | 72 ++++++++++++++++++++
3 files changed, 80 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/7096d8a8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 24c5b32..39d7f81 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -175,6 +175,8 @@ TEZ-UI CHANGES (TEZ-8):
Release 0.5.4: Unreleased
ALL CHANGES:
+ TEZ-2015. VertexImpl.doneReconfiguringVertex() should check other criteria
+ before sending notification
TEZ-2011. InputReadyVertexManager not resilient to updates in parallelism
TEZ-1934. TestAMRecovery may fail due to the execution order is not determined.
TEZ-1642. TestAMRecovery sometimes fail.
http://git-wip-us.apache.org/repos/asf/tez/blob/7096d8a8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index c3f4ae7..9deccd2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1560,11 +1560,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
Preconditions.checkState(vertexToBeReconfiguredByManager, "doneReconfiguringVertex() can be "
+ "invoked only after vertexReconfigurationPlanned() is invoked");
this.vertexToBeReconfiguredByManager = false;
- // TEZ-2015 VM may not have configured everything eg. input edge. maybeSendConfiguredEvent()
- if (completelyConfiguredSent.compareAndSet(false, true)) {
- // vertex already started and at that time this event was not sent. Send now.
- stateChangeNotifier.stateChanged(vertexId, new VertexStateUpdate(vertexName,
- org.apache.tez.dag.api.event.VertexState.CONFIGURED));
+ if (canInitVertex()) {
+ maybeSendConfiguredEvent();
+ } else {
+ Preconditions.checkState(getInternalState() == VertexState.INITIALIZING, "Vertex: "
+ + getLogIdentifier());
}
} finally {
writeLock.unlock();
@@ -3302,7 +3302,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private void maybeSendConfiguredEvent() {
// the vertex is fully configured by the time it starts. Always notify completely configured
// unless the vertex manager has told us that it is going to reconfigure it further
- Preconditions.checkState(canInitVertex());
+ Preconditions.checkState(canInitVertex(), "Vertex: " + getLogIdentifier());
if (!this.vertexToBeReconfiguredByManager) {
// this vertex will not be reconfigured by its manager
if (completelyConfiguredSent.compareAndSet(false, true)) {
http://git-wip-us.apache.org/repos/asf/tez/blob/7096d8a8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 645708e..e94bb17 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -4949,6 +4949,78 @@ public class TestVertexImpl {
Assert.assertNotNull(vB.getTask(0));
Assert.assertNotNull(vC.getTask(0));
}
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testVertexConfiguredDoneByVMBeforeEdgeDefined() throws Exception {
+ // Race when a source vertex manages to start before the target vertex has
+ // been initialized
+ setupPreDagCreation();
+ dagPlan = createSamplerDAGPlan(true);
+ setupPostDagCreation();
+
+ VertexImpl vA = vertices.get("A");
+ VertexImpl vB = vertices.get("B");
+ VertexImpl vC = vertices.get("C");
+
+ TestUpdateListener listener = new TestUpdateListener();
+ updateTracker
+ .registerForVertexUpdates(vB.getName(),
+ EnumSet.of(org.apache.tez.dag.api.event.VertexState.CONFIGURED),
+ listener);
+
+ // fudge the vm so we can do custom stuff
+ vB.vertexManager = new VertexManager(
+ VertexManagerPluginDescriptor.create(VertexManagerPluginForTest.class.getName()),
+ vB, appContext, mock(StateChangeNotifier.class));
+
+ vB.vertexReconfigurationPlanned();
+
+ dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(),
+ VertexEventType.V_INIT));
+ dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(),
+ VertexEventType.V_START));
+
+ dispatcher.await();
+ Assert.assertEquals(VertexState.INITIALIZING, vA.getState());
+ Assert.assertEquals(VertexState.INITIALIZING, vB.getState());
+ Assert.assertEquals(VertexState.INITIALIZING, vC.getState());
+
+ // setting the edge manager should vA to start
+ EdgeManagerPluginDescriptor mockEdgeManagerDescriptor =
+ EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName());
+ Edge e = vC.sourceVertices.get(vA);
+ Assert.assertNull(e.getEdgeManager());
+ e.setCustomEdgeManager(mockEdgeManagerDescriptor);
+ dispatcher.await();
+ Assert.assertEquals(VertexState.RUNNING, vA.getState());
+ Assert.assertEquals(VertexState.INITIALIZING, vB.getState());
+ Assert.assertEquals(VertexState.INITIALIZING, vC.getState());
+
+ // vB is not configured yet. Edge to C is not configured. So it should not send configured event
+ // even thought VM says its doneConfiguring vertex
+ vB.doneReconfiguringVertex();
+ Assert.assertEquals(0, listener.events.size());
+
+ // complete configuration and verify getting configured signal from vB
+ Map<String, EdgeManagerPluginDescriptor> edges = Maps.newHashMap();
+ edges.put("B", mockEdgeManagerDescriptor);
+ vC.setParallelism(2, vertexLocationHint, edges, null, true);
+
+ dispatcher.await();
+ Assert.assertEquals(1, listener.events.size());
+ Assert.assertEquals(vB.getName(), listener.events.get(0).getVertexName());
+ Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.CONFIGURED,
+ listener.events.get(0).getVertexState());
+ updateTracker.unregisterForVertexUpdates(vB.getName(), listener);
+
+ Assert.assertEquals(VertexState.RUNNING, vA.getState());
+ Assert.assertEquals(VertexState.RUNNING, vB.getState());
+ Assert.assertEquals(VertexState.RUNNING, vC.getState());
+ Assert.assertNotNull(vA.getTask(0));
+ Assert.assertNotNull(vB.getTask(0));
+ Assert.assertNotNull(vC.getTask(0));
+ }
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
[2/8] tez git commit: TEZ-2011. InputReadyVertexManager not resilient
to updates in parallelism (bikas)
Posted by ss...@apache.org.
TEZ-2011. InputReadyVertexManager not resilient to updates in parallelism (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b7268699
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b7268699
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b7268699
Branch: refs/heads/TEZ-2003
Commit: b7268699e684fc4d1ebc00b47d8f1f7e4163bf97
Parents: 6ba1339
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Jan 30 17:46:27 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Jan 30 17:46:27 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 1 +
.../vertexmanager/InputReadyVertexManager.java | 66 ++++++++++++---
.../TestInputReadyVertexManager.java | 87 +++++++++++++++++++-
4 files changed, 140 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/b7268699/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 43d009d..5c0bec0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -173,6 +173,7 @@ TEZ-UI CHANGES (TEZ-8):
Release 0.5.4: Unreleased
ALL CHANGES:
+ TEZ-2011. InputReadyVertexManager not resilient to updates in parallelism
TEZ-1934. TestAMRecovery may fail due to the execution order is not determined.
TEZ-1642. TestAMRecovery sometimes fail.
TEZ-1931. Publish tez version info to Timeline.
http://git-wip-us.apache.org/repos/asf/tez/blob/b7268699/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index f26e4ae..577c98b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1560,6 +1560,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
Preconditions.checkState(vertexToBeReconfiguredByManager, "doneReconfiguringVertex() can be "
+ "invoked only after vertexReconfigurationPlanned() is invoked");
this.vertexToBeReconfiguredByManager = false;
+ // TEZ-2015 VM may not have configured everything eg. input edge. maybeSendConfiguredEvent()
if (completelyConfiguredSent.compareAndSet(false, true)) {
// vertex already started and at that time this event was not sent. Send now.
stateChangeNotifier.stateChanged(vertexId, new VertexStateUpdate(vertexName,
http://git-wip-us.apache.org/repos/asf/tez/blob/b7268699/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
index 11185ee..f5c187e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
@@ -18,6 +18,8 @@
package org.apache.tez.dag.library.vertexmanager;
+import java.util.Collection;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@@ -32,11 +34,16 @@ import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
@Private
public class InputReadyVertexManager extends VertexManagerPlugin {
@@ -48,6 +55,8 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
int oneToOneSrcTasksDoneCount[];
TaskLocationHint oneToOneLocationHints[];
int numOneToOneEdges;
+ int numSignalsToWaitFor;
+ Multimap<String, Integer> pendingCompletions = LinkedListMultimap.create();
public InputReadyVertexManager(VertexManagerPluginContext context) {
super(context);
@@ -67,12 +76,10 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
}
}
- @Override
- public void initialize() {
- }
-
- @Override
- public void onVertexStarted(Map<String, List<Integer>> completions) {
+ void start() {
+ if (!ready()) {
+ return;
+ }
int numManagedTasks = getContext().getVertexNumTasks(getContext().getVertexName());
LOG.info("Managing " + numManagedTasks + " tasks for vertex: " + getContext().getVertexName());
taskIsStarted = new boolean[numManagedTasks];
@@ -117,24 +124,61 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
oneToOneLocationHints = new TaskLocationHint[oneToOneSrcTaskCount];
}
- for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
+ for (Map.Entry<String, Collection<Integer>> entry : pendingCompletions.asMap().entrySet()) {
for (Integer task : entry.getValue()) {
handleSourceTaskFinished(entry.getKey(), task);
}
}
}
+
+ boolean ready() {
+ int target = getContext().getInputVertexEdgeProperties().size() + 1;
+ Preconditions.checkState(numSignalsToWaitFor <= target);
+ return (numSignalsToWaitFor == target);
+ }
+
+ @Override
+ public void initialize() {
+ Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
+ // wait for sources and self to start
+ numSignalsToWaitFor = 0;
+ for (String entry : edges.keySet()) {
+ getContext().registerForVertexStateUpdates(entry, EnumSet.of(VertexState.CONFIGURED));
+ }
+ }
+
+ @Override
+ public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception {
+ numSignalsToWaitFor++;
+ LOG.info("Received configured signal from: " + stateUpdate.getVertexName() +
+ " numConfiguredSources: " + numSignalsToWaitFor);
+ start();
+ }
@Override
- public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
- handleSourceTaskFinished(srcVertexName, taskId);
+ public synchronized void onVertexStarted(Map<String, List<Integer>> completions) {
+ for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
+ pendingCompletions.putAll(entry.getKey(), entry.getValue());
+ }
+ numSignalsToWaitFor++;
+ start();
+ }
+
+ @Override
+ public synchronized void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+ if (ready()) {
+ handleSourceTaskFinished(srcVertexName, taskId);
+ } else {
+ pendingCompletions.put(srcVertexName, taskId);
+ }
}
@Override
- public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+ public synchronized void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
}
@Override
- public void onRootVertexInitialized(String inputName,
+ public synchronized void onRootVertexInitialized(String inputName,
InputDescriptor inputDescriptor, List<Event> events) {
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b7268699/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
index c6981ed..9a83a51 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
@@ -34,6 +34,8 @@ import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -55,7 +57,7 @@ public class TestInputReadyVertexManager {
}
@Test (timeout=5000)
- public void testBasicScatterGather() {
+ public void testBasicScatterGather() throws Exception {
HashMap<String, EdgeProperty> mockInputVertices =
new HashMap<String, EdgeProperty>();
String mockSrcVertexId1 = "Vertex1";
@@ -80,6 +82,9 @@ public class TestInputReadyVertexManager {
InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
manager.initialize();
+ // first source vertex configured
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ // then own vertex started
manager.onVertexStarted(initialCompletions);
manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
verify(mockContext, times(0)).scheduleVertexTasks(anyList());
@@ -89,7 +94,7 @@ public class TestInputReadyVertexManager {
}
@Test (timeout=5000)
- public void testBasicOneToOne() {
+ public void testBasicOneToOne() throws Exception {
HashMap<String, EdgeProperty> mockInputVertices =
new HashMap<String, EdgeProperty>();
String mockSrcVertexId1 = "Vertex1";
@@ -114,7 +119,10 @@ public class TestInputReadyVertexManager {
InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
manager.initialize();
+ // first own vertex started
manager.onVertexStarted(initialCompletions);
+ // then source vertex configured
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
Assert.assertEquals(1, requestCaptor.getValue().size());
Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex().intValue());
@@ -139,9 +147,56 @@ public class TestInputReadyVertexManager {
Assert.assertEquals(2, requestCaptor.getValue().get(0)
.getTaskLocationHint().getAffinitizedTask().getTaskIndex());
}
+
+ @Test (timeout=5000)
+ public void testDelayedConfigureOneToOne() throws Exception {
+ HashMap<String, EdgeProperty> mockInputVertices =
+ new HashMap<String, EdgeProperty>();
+ String mockSrcVertexId1 = "Vertex1";
+ EdgeProperty eProp1 = EdgeProperty.create(
+ EdgeProperty.DataMovementType.ONE_TO_ONE,
+ EdgeProperty.DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create("out"),
+ InputDescriptor.create("in"));
+
+ String mockManagedVertexId = "Vertex";
+
+ VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+ when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
+ when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
+ mockInputVertices.put(mockSrcVertexId1, eProp1);
+
+ Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
+ initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
+
+ InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
+ manager.initialize();
+ // first own vertex started
+ manager.onVertexStarted(initialCompletions);
+ verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
+ manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
+ verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
+ // then source vertex configured. now we start
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
+ Assert.assertEquals(2, requestCaptor.getAllValues().size());
+ Assert.assertEquals(1, requestCaptor.getValue().size());
+ Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+ manager.onSourceTaskCompleted(mockSrcVertexId1, 2);
+ verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
+ Assert.assertEquals(1, requestCaptor.getValue().size());
+ Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+ Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0)
+ .getTaskLocationHint().getAffinitizedTask().getVertexName());
+ Assert.assertEquals(2, requestCaptor.getValue().get(0)
+ .getTaskLocationHint().getAffinitizedTask().getTaskIndex());
+ }
@Test (timeout=5000)
- public void testComplex() {
+ public void testComplex() throws Exception {
HashMap<String, EdgeProperty> mockInputVertices =
new HashMap<String, EdgeProperty>();
String mockSrcVertexId1 = "Vertex1";
@@ -192,22 +247,43 @@ public class TestInputReadyVertexManager {
Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
- // 1-1 sources do not match managed tasks
+ // 1-1 sources do not match managed tasks before vertex started
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
manager.initialize();
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
try {
manager.onVertexStarted(initialCompletions);
Assert.assertTrue("Should have exception", false);
} catch (TezUncheckedException e) {
e.getMessage().contains("Managed task number must equal 1-1 source");
}
+
+ // 1-1 sources do not match managed tasks after vertex started
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
+ manager = new InputReadyVertexManager(mockContext);
+ manager.initialize();
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+ manager.onVertexStarted(initialCompletions);
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
+ try {
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+ Assert.assertTrue("Should have exception", false);
+ } catch (TezUncheckedException e) {
+ e.getMessage().contains("Managed task number must equal 1-1 source");
+ }
// 1-1 sources do not match
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(4);
manager = new InputReadyVertexManager(mockContext);
manager.initialize();
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
try {
manager.onVertexStarted(initialCompletions);
Assert.assertTrue("Should have exception", false);
@@ -220,6 +296,9 @@ public class TestInputReadyVertexManager {
when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3);
manager = new InputReadyVertexManager(mockContext);
manager.initialize();
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
manager.onVertexStarted(initialCompletions);
// all 1-1 0's done but not scheduled because v1 is not done
manager.onSourceTaskCompleted(mockSrcVertexId3, 0);