You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/02/05 22:00:45 UTC

[1/8] tez git commit: TEZ-1929. pre-empted tasks should be marked as killed instead of failed (bikas)

Repository: tez
Updated Branches:
  refs/heads/TEZ-2003 753b7efde -> f49c054c9 (forced update)


TEZ-1929. pre-empted tasks should be marked as killed instead of failed (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6ba1339d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6ba1339d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6ba1339d

Branch: refs/heads/TEZ-2003
Commit: 6ba1339d57a5d05fa14f35f352076131bffea483
Parents: 21bce95
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Jan 30 16:40:11 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Jan 30 16:40:11 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../rm/container/AMContainerEventCompleted.java |  5 +--
 .../dag/app/rm/container/AMContainerImpl.java   |  4 +--
 .../tez/dag/app/TestMockDAGAppMaster.java       | 38 +++++++++++++++++++-
 .../app/rm/TestTaskSchedulerEventHandler.java   |  2 +-
 .../dag/app/rm/container/TestAMContainer.java   |  6 ++--
 6 files changed, 47 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/6ba1339d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3414adc..43d009d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -55,6 +55,7 @@ Release 0.6.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1929. pre-empted tasks should be marked as killed instead of failed
   TEZ-2017. TEZ UI - Dag view throwing error whild re-displaying additionals in some dags.
   TEZ-2013. TEZ UI - App Details Page UI Nits
   TEZ-2014. Tez UI: Nits : All tables, Vertices Page UI.

http://git-wip-us.apache.org/repos/asf/tez/blob/6ba1339d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
index a455f1e..9bb6d7f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
@@ -37,14 +37,15 @@ public class AMContainerEventCompleted extends AMContainerEvent {
   }
 
   public boolean isPreempted() {
-    return (exitStatus == ContainerExitStatus.PREEMPTED);
+    return (exitStatus == ContainerExitStatus.PREEMPTED || 
+        errCause == TaskAttemptTerminationCause.INTERNAL_PREEMPTION);
   }
   
   public boolean isDiskFailed() {
     return (exitStatus == ContainerExitStatus.DISKS_FAILED);
   }
   
-  public boolean isClusterAction() {
+  public boolean isSystemAction() {
     return isPreempted() || isDiskFailed();
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/6ba1339d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 536001c..5c5a8c5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -662,7 +662,7 @@ public class AMContainerImpl implements AMContainer {
       AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
       if (container.pendingAttempt != null) {
         String errorMessage = getMessage(container, event);
-        if (event.isClusterAction()) {
+        if (event.isSystemAction()) {
           container.sendContainerTerminatedBySystemToTaskAttempt(container.pendingAttempt,
               errorMessage, event.getTerminationCause());
         } else {
@@ -921,7 +921,7 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
-      if (event.isClusterAction()) {
+      if (event.isSystemAction()) {
         container.sendContainerTerminatedBySystemToTaskAttempt(container.runningAttempt,
             getMessage(container, event), event.getTerminationCause());
       } else {

http://git-wip-us.apache.org/repos/asf/tez/blob/6ba1339d/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index bed971a..a46821a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -37,10 +36,16 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
 import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher.ContainerData;
 import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
+import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -104,6 +109,37 @@ public class TestMockDAGAppMaster {
     tezClient.stop();
   }
   
+  @Test (timeout = 5000)
+  public void testInternalPreemption() throws Exception {
+    TezConfiguration tezconf = new TezConfiguration(defaultConf);
+    
+    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
+    tezClient.start();
+    
+    MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+    MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
+    mockLauncher.startScheduling(false);
+    // there is only 1 task whose first attempt will be preempted
+    DAG dag = DAG.create("test");
+    Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 1);
+    dag.addVertex(vA);
+
+    DAGClient dagClient = tezClient.submitDAG(dag);
+    mockLauncher.waitTillContainersLaunched();
+    ContainerData cData = mockLauncher.getContainers().values().iterator().next();
+    DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+    mockApp.getTaskSchedulerEventHandler().preemptContainer(cData.cId);
+    
+    mockLauncher.startScheduling(true);
+    dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+    TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), 0);
+    TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
+    TaskAttempt killedTa = dagImpl.getVertex(vA.getName()).getTask(0).getAttempt(killedTaId);
+    Assert.assertEquals(TaskAttemptState.KILLED, killedTa.getState());
+    tezClient.stop();
+  }
+  
   @Test (timeout = 10000)
   public void testMultipleSubmissions() throws Exception {
     Map<String, LocalResource> lrDAG = Maps.newHashMap();

http://git-wip-us.apache.org/repos/asf/tez/blob/6ba1339d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index d2dece3..62618cc 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -204,7 +204,7 @@ public class TestTaskSchedulerEventHandler {
     AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event;
     Assert.assertEquals(mockCId, completedEvent.getContainerId());
     Assert.assertEquals("Container preempted internally", completedEvent.getDiagnostics());
-    Assert.assertFalse(completedEvent.isPreempted());
+    Assert.assertTrue(completedEvent.isPreempted());
     Assert.assertFalse(completedEvent.isDiskFailed());
     Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
         completedEvent.getTerminationCause());

http://git-wip-us.apache.org/repos/asf/tez/blob/6ba1339d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index f273896..438c50d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -704,10 +704,10 @@ public class TestAMContainer {
     verify(wc.chh).unregister(wc.containerID);
 
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
-    Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
-        ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause());
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
-        TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
+    Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
+        ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause());
 
     assertFalse(wc.amContainer.isInErrorState());
 


[4/8] tez git commit: TEZ-1895. Vertex reRunning should decrease successfulMembers of VertexGroupInfo (zjffdu)

Posted by ss...@apache.org.
TEZ-1895. Vertex reRunning should decrease successfulMembers of VertexGroupInfo (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cfa637a1
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cfa637a1
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cfa637a1

Branch: refs/heads/TEZ-2003
Commit: cfa637a16fa01b197c0310e03ef4a6e19883aaf1
Parents: ad6bf07
Author: Jeff Zhang <zj...@apache.org>
Authored: Mon Feb 2 10:45:32 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Mon Feb 2 10:45:32 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/dag/app/dag/DAGTerminationCause.java    |  3 ++
 .../tez/dag/app/dag/VertexTerminationCause.java |  3 ++
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 25 ++++++---
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 23 +++++++++
 .../tez/dag/app/dag/impl/TestDAGImpl.java       | 53 +++++++++++++++++++-
 6 files changed, 101 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 03b0624..24c5b32 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1895. Vertex reRunning should decrease successfulMembers of VertexGroupInfo.
   TEZ-1999. IndexOutOfBoundsException during merge.
   TEZ-2000. Source vertex exists error during DAG submission.
   TEZ-2008. Add methods to SecureShuffleUtils to verify a reply based on a provided Key.

http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
index d01fb2f..5ae96a1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
@@ -39,6 +39,9 @@ public enum DAGTerminationCause {
   /** DAG failed during output commit. */
   COMMIT_FAILURE,
 
+  /** In some cases, vertex could not rerun, e.g. its output been committed as a shared output of vertex group */
+  VERTEX_RERUN_AFTER_COMMIT,
+
   /** DAG failed while trying to write recovery events */
   RECOVERY_FAILURE,
 

http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
index 4bfe001..2eeae3c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
@@ -43,6 +43,9 @@ public enum VertexTerminationCause {
   /** This vertex failed during commit. */
   COMMIT_FAILURE,
 
+  /** In some cases, vertex could not rerun, e.g. its output been committed as a shared output of vertex group */
+  VERTEX_RERUN_AFTER_COMMIT,
+
   /** This vertex failed as it had invalid number tasks. */
   INVALID_NUM_OF_TASKS, 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index f4e5bad..aa7723b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1083,6 +1083,14 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         dag.addDiagnostic(diagnosticMsg);
         return dag.finished(DAGState.FAILED);
       }
+      if(dag.terminationCause == DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT ){
+        String diagnosticMsg = "DAG failed due to vertex rerun after commit." +
+            " failedVertices:" + dag.numFailedVertices +
+            " killedVertices:" + dag.numKilledVertices;
+        LOG.info(diagnosticMsg);
+        dag.addDiagnostic(diagnosticMsg);
+        return dag.finished(DAGState.FAILED);
+      }
       if(dag.terminationCause == DAGTerminationCause.RECOVERY_FAILURE ){
         String diagnosticMsg = "DAG failed due to failure in recovery handling." +
             " failedVertices:" + dag.numFailedVertices +
@@ -1738,9 +1746,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     public DAGState transition(DAGImpl job, DAGEvent event) {
       DAGEventVertexReRunning vertexEvent = (DAGEventVertexReRunning) event;
       Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
-      job.numCompletedVertices--;
       boolean failed = job.vertexReRunning(vertex);
-
+      if (!failed) {
+        job.numCompletedVertices--;
+      }
 
       LOG.info("Vertex " + vertex.getLogIdentifier() + " re-running."
           + ", numCompletedVertices=" + job.numCompletedVertices
@@ -1848,11 +1857,15 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       if (groupList != null) {
         for (VertexGroupInfo groupInfo : groupList) {
           if (groupInfo.committed) {
-            LOG.info("Aborting job as committed vertex: "
-                + vertex.getLogIdentifier() + " is re-running");
-            enactKill(DAGTerminationCause.COMMIT_FAILURE,
-                VertexTerminationCause.COMMIT_FAILURE);
+            String msg = "Aborting job as committed vertex: "
+                + vertex.getLogIdentifier() + " is re-running";
+            LOG.info(msg);
+            addDiagnostic(msg);
+            enactKill(DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT,
+                VertexTerminationCause.VERTEX_RERUN_AFTER_COMMIT);
             return true;
+          } else {
+            groupInfo.successfulMembers--;
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 577c98b..c3f4ae7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1826,6 +1826,28 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         vertex.abortVertex(State.FAILED);
         return vertex.finished(VertexState.FAILED);
       }
+      else if (vertex.terminationCause == VertexTerminationCause.COMMIT_FAILURE) {
+        vertex.setFinishTime();
+        String diagnosticMsg = "Vertex failed/killed due to COMMIT_FAILURE failed. "
+            + "failedTasks:"
+            + vertex.failedTaskCount
+            + " killedTasks:"
+            + vertex.killedTaskCount;
+        LOG.info(diagnosticMsg);
+        vertex.abortVertex(State.FAILED);
+        return vertex.finished(VertexState.FAILED);
+      }
+      else if (vertex.terminationCause == VertexTerminationCause.VERTEX_RERUN_AFTER_COMMIT) {
+        vertex.setFinishTime();
+        String diagnosticMsg = "Vertex failed/killed due to invalid rerun failed. "
+            + "failedTasks:"
+            + vertex.failedTaskCount
+            + " killedTasks:"
+            + vertex.killedTaskCount;
+        LOG.info(diagnosticMsg);
+        vertex.abortVertex(State.FAILED);
+        return vertex.finished(VertexState.FAILED);
+      }
       else {
         //should never occur
         throw new TezUncheckedException("All tasks complete, but cannot determine final state of vertex:" + vertex.logIdentifier
@@ -3472,6 +3494,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         case INIT_FAILURE:
         case INTERNAL_ERROR:
         case AM_USERCODE_FAILURE:
+        case VERTEX_RERUN_AFTER_COMMIT:
         case OTHER_VERTEX_FAILURE: vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_VERTEX_FAILURE); break;
         default://should not occur
           throw new TezUncheckedException("VertexKilledTransition: event.terminationCause is unexpected: " + trigger);

http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 7c4d715..cae9059 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -91,6 +91,7 @@ import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
+import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
@@ -774,6 +775,9 @@ public class TestDAGImpl {
     doReturn(appAttemptId.getApplicationId())
         .when(groupAppContext).getApplicationID();
     doReturn(historyEventHandler).when(groupAppContext).getHistoryHandler();
+
+    // reset totalCommitCounter to 0
+    TotalCountingOutputCommitter.totalCommitCounter = 0;
     taskEventDispatcher = new TaskEventDispatcher();
     dispatcher.register(TaskEventType.class, taskEventDispatcher);
     taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
@@ -1080,7 +1084,54 @@ public class TestDAGImpl {
     Assert.assertEquals(DAGState.SUCCEEDED, groupDag.getState());
     Assert.assertEquals(2, TotalCountingOutputCommitter.totalCommitCounter);
   }  
-  
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testGroupDAGWithVertexReRunning() {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
+    initDAG(groupDag);
+    startDAG(groupDag);
+    dispatcher.await();
+
+    Vertex v1 = groupDag.getVertex("vertex1");
+    Vertex v2 = groupDag.getVertex("vertex2");
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(new DAGEventVertexReRunning(v1.getVertexId()));
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED));
+    dispatcher.await();
+    // commit should not happen due to vertex-rerunning
+    Assert.assertEquals(0, TotalCountingOutputCommitter.totalCommitCounter);
+
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
+    dispatcher.await();
+    // commit happen
+    Assert.assertEquals(1, TotalCountingOutputCommitter.totalCommitCounter);
+    Assert.assertEquals(2, groupDag.getSuccessfulVertices());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testGroupDAGWithVertexReRunningAfterCommit() {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
+    initDAG(groupDag);
+    startDAG(groupDag);
+    dispatcher.await();
+
+    Vertex v1 = groupDag.getVertex("vertex1");
+    Vertex v2 = groupDag.getVertex("vertex2");
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED));
+    dispatcher.await();
+    // vertex group commit happens
+    Assert.assertEquals(1, TotalCountingOutputCommitter.totalCommitCounter);
+
+    // dag failed when vertex re-run happens after vertex group commit is done.
+    dispatcher.getEventHandler().handle(new DAGEventVertexReRunning(v1.getVertexId()));
+    dispatcher.await();
+    Assert.assertEquals(DAGState.FAILED, groupDag.getState());
+    Assert.assertEquals(DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT, groupDag.getTerminationCause());
+  }
+
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testDAGCompletionWithCommitSuccess() {


[3/8] tez git commit: TEZ-1999. IndexOutOfBoundsException during merge (rbalamohan)

Posted by ss...@apache.org.
TEZ-1999. IndexOutOfBoundsException during merge (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ad6bf07e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ad6bf07e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ad6bf07e

Branch: refs/heads/TEZ-2003
Commit: ad6bf07eba9923fca2627503652d16cfceb72d39
Parents: b726869
Author: Rajesh Balamohan <rb...@hortonworks.com>
Authored: Sat Jan 31 19:53:23 2015 +0530
Committer: Rajesh Balamohan <rb...@hortonworks.com>
Committed: Sat Jan 31 19:53:23 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../library/common/sort/impl/TezMerger.java     |  29 +-
 .../library/common/sort/impl/TestTezMerger.java | 518 ++++++++++++++++++-
 3 files changed, 525 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ad6bf07e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5c0bec0..03b0624 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1999. IndexOutOfBoundsException during merge.
   TEZ-2000. Source vertex exists error during DAG submission.
   TEZ-2008. Add methods to SecureShuffleUtils to verify a reply based on a provided Key.
   TEZ-1995. Build failure against hadoop 2.2.

http://git-wip-us.apache.org/repos/asf/tez/blob/ad6bf07e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index ed9a59d..5dd538a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -487,13 +487,32 @@ public class TezMerger {
       return value;
     }
 
+    private void populatePreviousKey() throws IOException {
+      key.reset();
+      BufferUtils.copy(key, prevKey);
+    }
+
     private void adjustPriorityQueue(Segment reader) throws IOException{
       long startPos = reader.getPosition();
-      if (hasNext != null && hasNext != KeyState.SAME_KEY) {
-        key.reset();
-        // TODO: This copy can be an unwanted operation when all keys are unique. Revisit this
-        // when we have better stats.
-        BufferUtils.copy(key, prevKey);
+      if (hasNext == null) {
+        /**
+         * hasNext can be null during first iteration & prevKey is initialized here.
+         * In cases of NO_KEY/NEW_KEY, we readjust the queue later. If new segment/file is found
+         * during this process, we need to compare keys for RLE across segment boundaries.
+         * prevKey can't be empty at that time (e.g custom comparators)
+         */
+        populatePreviousKey();
+      } else {
+        //indicates a key has been read already
+        if (hasNext != KeyState.SAME_KEY) {
+          /**
+           * Store previous key before reading next for later key comparisons.
+           * If all keys in a segment are unique, it would always hit this code path and key copies
+           * are wasteful in such condition, as these comparisons are mainly done for RLE.
+           * TODO: When better stats are available, this condition can be avoided.
+           */
+          populatePreviousKey();
+        }
       }
       hasNext = reader.readRawKey();
       long endPos = reader.getPosition();

http://git-wip-us.apache.org/repos/asf/tez/blob/ad6bf07e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
index ac17d8d..1e14b9b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.runtime.library.common.sort.impl;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
@@ -32,23 +33,27 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.TestMergeManager;
-import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import static org.junit.Assert.assertTrue;
+
 public class TestTezMerger {
 
   private static final Log LOG = LogFactory.getLog(TestTezMerger.class);
@@ -56,6 +61,11 @@ public class TestTezMerger {
   private static Configuration defaultConf = new Configuration();
   private static FileSystem localFs = null;
   private static Path workDir = null;
+  private static RawComparator comparator = null;
+  private static Random rnd = new Random();
+
+  private static final String SAME_KEY = "SAME_KEY";
+  private static final String DIFF_KEY = "DIFF_KEY";
 
   //store the generated data for final verification
   private static ListMultimap<Integer, Long> verificationDataSet = LinkedListMultimap.create();
@@ -76,6 +86,7 @@ public class TestTezMerger {
     Path baseDir = new Path(workDir, TestMergeManager.class.getName());
     String localDirs = baseDir.toString();
     defaultConf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
+    comparator = ConfigUtils.getIntermediateInputKeyComparator(defaultConf);
   }
 
   @AfterClass
@@ -83,8 +94,7 @@ public class TestTezMerger {
     localFs.delete(workDir, true);
   }
 
-
-  @Test
+  @Test(timeout = 80000)
   public void testMerge() throws Exception {
     /**
      * test with number of files, keys per file and mergefactor
@@ -95,6 +105,7 @@ public class TestTezMerger {
     merge(100, 0, 5);
 
     //small files
+    merge(12, 4, 2);
     merge(2, 10, 2);
     merge(1, 10, 1);
     merge(5, 10, 3);
@@ -105,17 +116,487 @@ public class TestTezMerger {
     merge(5, 1000, 5);
     merge(5, 1000, 10);
     merge(5, 1000, 100);
+
+    //Create random mix of files (empty files + files with keys)
+    List<Path> pathList = new LinkedList<Path>();
+    pathList.clear();
+    pathList.addAll(createIFiles(Math.max(2, rnd.nextInt(20)), 0));
+    pathList.addAll(createIFiles(Math.max(2, rnd.nextInt(20)), Math.max(2, rnd.nextInt(10))));
+    merge(pathList, Math.max(2, rnd.nextInt(10)));
+  }
+
+  private Path createIFileWithTextData(List<String> data) throws IOException {
+    Path path = new Path(workDir + "/src", "data_" + System.nanoTime() + ".out");
+    FSDataOutputStream out = localFs.create(path);
+    IFile.Writer writer = new IFile.Writer(defaultConf, out, Text.class,
+        Text.class, null, null, null, true);
+    for (String key : data) {
+      writer.append(new Text(key), new Text(key + "_" + System.nanoTime()));
+    }
+    writer.close();
+    out.close();
+    return path;
+  }
+
+  /**
+   * Verify if the records are as per the expected data set
+   *
+   * @param records
+   * @param expectedResult
+   * @throws IOException
+   */
+  private void verify(TezRawKeyValueIterator records, String[][] expectedResult)
+      throws IOException {
+    //Iterate through merged dataset (shouldn't throw any exceptions)
+    int i = 0;
+    while (records.next()) {
+      DataInputBuffer key = records.getKey();
+      DataInputBuffer value = records.getValue();
+
+      Text k = new Text();
+      k.readFields(key);
+      Text v = new Text();
+      v.readFields(value);
+
+      assertTrue(k.toString().equals(expectedResult[i][0]));
+
+      String correctResult = expectedResult[i][1];
+
+      if (records.isSameKey()) {
+        assertTrue("Expected " + correctResult, correctResult.equalsIgnoreCase(SAME_KEY));
+        LOG.info("\tSame Key : key=" + k + ", val=" + v);
+      } else {
+        assertTrue("Expected " + correctResult, correctResult.equalsIgnoreCase(DIFF_KEY));
+        LOG.info("key=" + k + ", val=" + v);
+      }
+
+      i++;
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testWithCustomComparator_WithEmptyStrings() throws Exception {
+    List<Path> pathList = new LinkedList<Path>();
+    List<String> data = Lists.newLinkedList();
+    //Merge datasets with custom comparator
+    RawComparator rc = new CustomComparator();
+
+    LOG.info("Test with custom comparator with empty strings in middle");
+
+    //Test with 4 files, where some texts are empty strings
+    data.add("0");
+    data.add("0");
+    pathList.add(createIFileWithTextData(data));
+
+    //Second file with empty key
+    data.clear();
+    data.add("");
+    pathList.add(createIFileWithTextData(data));
+
+    //Third file
+    data.clear();
+    data.add("0");
+    data.add("0");
+    pathList.add(createIFileWithTextData(data));
+
+    //Third file
+    data.clear();
+    data.add("1");
+    data.add("2");
+    pathList.add(createIFileWithTextData(data));
+
+    TezRawKeyValueIterator records = merge(pathList, rc);
+
+    String[][] expectedResult =
+        {
+            //formatting intentionally
+            { "", DIFF_KEY },
+            { "0", DIFF_KEY },
+              { "0", SAME_KEY },
+              { "0", SAME_KEY },
+              { "0", SAME_KEY },
+            { "1", DIFF_KEY },
+            { "2", DIFF_KEY }
+        };
+
+    verify(records, expectedResult);
+    pathList.clear();
+    data.clear();
+  }
+
+  @Test(timeout = 5000)
+  public void testWithCustomComparator_No_RLE() throws Exception {
+    List<Path> pathList = new LinkedList<Path>();
+    List<String> data = Lists.newLinkedList();
+    //Merge datasets with custom comparator
+    RawComparator rc = new CustomComparator();
+
+    LOG.info("Test with custom comparator with no RLE");
+
+    //Test with 3 files,
+    data.add("1");
+    data.add("4");
+    data.add("5");
+    pathList.add(createIFileWithTextData(data));
+
+    //Second file with empty key
+    data.clear();
+    data.add("2");
+    data.add("6");
+    data.add("7");
+    pathList.add(createIFileWithTextData(data));
+
+    //Third file
+    data.clear();
+    data.add("3");
+    data.add("8");
+    data.add("9");
+    pathList.add(createIFileWithTextData(data));
+
+    TezRawKeyValueIterator records = merge(pathList, rc);
+
+    String[][] expectedResult =
+        {
+            { "1", DIFF_KEY },
+            { "2", DIFF_KEY },
+            { "3", DIFF_KEY },
+            { "4", DIFF_KEY },
+            { "5", DIFF_KEY },
+            { "6", DIFF_KEY },
+            { "7", DIFF_KEY },
+            { "8", DIFF_KEY },
+            { "9", DIFF_KEY }
+        };
+
+    verify(records, expectedResult);
+    pathList.clear();
+    data.clear();
+  }
+
+  @Test(timeout = 5000)
+  public void testWithCustomComparator_RLE_acrossFiles() throws Exception {
+    List<Path> pathList = new LinkedList<Path>();
+    List<String> data = Lists.newLinkedList();
+
+    LOG.info("Test with custom comparator with RLE spanning across segment boundaries");
+
+    //Test with 2 files, where the RLE keys can span across files
+    //First file
+    data.clear();
+    data.add("0");
+    data.add("0");
+    pathList.add(createIFileWithTextData(data));
+
+    //Second file
+    data.clear();
+    data.add("0");
+    data.add("1");
+    pathList.add(createIFileWithTextData(data));
+
+    //Merge datasets with custom comparator
+    RawComparator rc = new CustomComparator();
+    TezRawKeyValueIterator records = merge(pathList, rc);
+
+    //expected result
+    String[][] expectedResult =
+        {
+            //formatting intentionally
+            { "0", DIFF_KEY },
+              { "0", SAME_KEY },
+              { "0", SAME_KEY },
+            { "1", DIFF_KEY }
+        };
+
+    verify(records, expectedResult);
+    pathList.clear();
+    data.clear();
+
+  }
+
+  @Test(timeout = 5000)
+  public void testWithCustomComparator_mixedFiles() throws Exception {
+    List<Path> pathList = new LinkedList<Path>();
+    List<String> data = Lists.newLinkedList();
+
+    LOG.info("Test with custom comparator with mixed set of segments (empty, non-empty etc)");
+
+    //Test with 2 files, where the RLE keys can span across files
+    //First file
+    data.clear();
+    data.add("0");
+    pathList.add(createIFileWithTextData(data));
+
+    //Second file; empty file
+    data.clear();
+    pathList.add(createIFileWithTextData(data));
+
+    //Third file with empty key
+    data.clear();
+    data.add("");
+    pathList.add(createIFileWithTextData(data));
+
+    //Fourth file with repeated keys
+    data.clear();
+    data.add("0");
+    data.add("0");
+    data.add("0");
+    pathList.add(createIFileWithTextData(data));
+
+    //Merge datasets with custom comparator
+    RawComparator rc = new CustomComparator();
+    TezRawKeyValueIterator records = merge(pathList, rc);
+
+    //expected result
+    String[][] expectedResult =
+        {
+            //formatting intentionally
+            { "", DIFF_KEY },
+            { "0", DIFF_KEY },
+              { "0", SAME_KEY },
+              { "0", SAME_KEY },
+              { "0", SAME_KEY }
+        };
+
+    verify(records, expectedResult);
+    pathList.clear();
+    data.clear();
+  }
+
+  @Test(timeout = 5000)
+  public void testWithCustomComparator_RLE() throws Exception {
+    List<Path> pathList = new LinkedList<Path>();
+    List<String> data = Lists.newLinkedList();
+
+    LOG.info("Test with custom comparator 2 files one containing RLE and also other segment "
+        + "starting with same key");
+
+    //Test with 2 files, same keys in middle of file
+    //First file
+    data.clear();
+    data.add("1");
+    data.add("2");
+    data.add("2");
+    pathList.add(createIFileWithTextData(data));
+
+    //Second file
+    data.clear();
+    data.add("2");
+    data.add("3");
+    pathList.add(createIFileWithTextData(data));
+
+    TezRawKeyValueIterator records = merge(pathList, new CustomComparator());
+
+    String[][] expectedResult =
+        {
+            //formatting intentionally
+            { "1", DIFF_KEY },
+            { "2", DIFF_KEY },
+              { "2", SAME_KEY },
+              { "2", SAME_KEY },
+            { "3", DIFF_KEY }
+        };
+
+    verify(records, expectedResult);
+    pathList.clear();
+    data.clear();
+  }
+
+  @Test(timeout = 5000)
+  public void testWithCustomComparator_RLE2() throws Exception {
+    List<Path> pathList = new LinkedList<Path>();
+    List<String> data = Lists.newLinkedList();
+
+    LOG.info(
+        "Test with custom comparator 3 files with RLE (starting keys) spanning across boundaries");
+
+    //Test with 3 files, same keys in middle of file
+    //First file
+    data.clear();
+    data.add("0");
+    data.add("1");
+    data.add("1");
+    pathList.add(createIFileWithTextData(data));
+
+    //Second file
+    data.clear();
+    data.add("0");
+    data.add("1");
+    pathList.add(createIFileWithTextData(data));
+
+    //Third file
+    data.clear();
+    data.add("0");
+    data.add("1");
+    data.add("1");
+    pathList.add(createIFileWithTextData(data));
+
+    TezRawKeyValueIterator records = merge(pathList, new CustomComparator());
+    String[][] expectedResult =
+        {
+            //formatting intentionally
+            { "0", DIFF_KEY },
+              { "0", SAME_KEY },
+              { "0", SAME_KEY },
+            { "1", DIFF_KEY },
+              { "1", SAME_KEY },
+              { "1", SAME_KEY },
+              { "1", SAME_KEY },
+              { "1", SAME_KEY }
+
+        };
+
+    verify(records, expectedResult);
+    pathList.clear();
+    data.clear();
+  }
+
+  @Test(timeout = 5000)
+  public void testWithCustomComparator() throws Exception {
+    List<Path> pathList = new LinkedList<Path>();
+    List<String> data = Lists.newLinkedList();
+
+    LOG.info(
+        "Test with custom comparator 3 files with RLE (starting keys) spanning across boundaries");
+
+    //Test with 3 files
+    //First file
+    data.clear();
+    data.add("0");
+    pathList.add(createIFileWithTextData(data));
+
+    //Second file
+    data.clear();
+    data.add("0");
+    pathList.add(createIFileWithTextData(data));
+
+    //Third file
+    data.clear();
+    data.add("1");
+    pathList.add(createIFileWithTextData(data));
+
+    TezRawKeyValueIterator records = merge(pathList, new CustomComparator());
+    String[][] expectedResult =
+        {
+            //formatting intentionally
+            { "0", DIFF_KEY },
+              { "0", SAME_KEY },
+            { "1", DIFF_KEY }
+        };
+
+    verify(records, expectedResult);
+    pathList.clear();
+    data.clear();
+  }
+
+  @Test(timeout = 5000)
+  public void testWithCustomComparator_RLE3() throws Exception {
+    List<Path> pathList = new LinkedList<Path>();
+    List<String> data = Lists.newLinkedList();
+
+    LOG.info("Test with custom comparator");
+
+    //Test with 3 files, same keys in middle of file
+    //First file
+    data.clear();
+    data.add("0");
+    pathList.add(createIFileWithTextData(data));
+
+    //Second file
+    data.clear();
+    data.add("0");
+    data.add("1");
+    data.add("1");
+    pathList.add(createIFileWithTextData(data));
+
+    TezRawKeyValueIterator records = merge(pathList, new CustomComparator());
+
+    String[][] expectedResult =
+        {
+            //formatting intentionally
+            { "0", DIFF_KEY },
+              { "0", SAME_KEY },
+            { "1", DIFF_KEY },
+              { "1", SAME_KEY } };
+
+    verify(records, expectedResult);
+    pathList.clear();
+    data.clear();
+  }
+
+  @Test(timeout = 5000)
+  public void testWithCustomComparator_allEmptyFiles() throws Exception {
+    List<Path> pathList = new LinkedList<Path>();
+    List<String> data = Lists.newLinkedList();
+
+    LOG.info("Test with custom comparator where all files are empty");
+
+    //First file
+    pathList.add(createIFileWithTextData(data));
+
+    //Second file
+    pathList.add(createIFileWithTextData(data));
+
+    //Third file
+    pathList.add(createIFileWithTextData(data));
+
+    //Fourth file
+    pathList.add(createIFileWithTextData(data));
+
+    TezRawKeyValueIterator records = merge(pathList, new CustomComparator());
+
+    String[][] expectedResult = new String[0][0];
+
+    verify(records, expectedResult);
+  }
+
+  /**
+   * Merge the data sets
+   *
+   * @param pathList
+   * @param rc
+   * @return
+   * @throws IOException
+   */
+  private TezRawKeyValueIterator merge(List<Path> pathList, RawComparator rc) throws IOException {
+    TezMerger merger = new TezMerger();
+    TezRawKeyValueIterator records = merger.merge(defaultConf, localFs, IntWritable.class,
+        LongWritable.class, null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]),
+        true, 4, new Path(workDir, "tmp_" + System.nanoTime()), ((rc == null) ? comparator : rc),
+        new Reporter(), null, null,
+        null, new Progress());
+    return records;
+  }
+
+
+
+  //Sample comparator to test TEZ-1999 corner case
+  static class CustomComparator extends WritableComparator {
+    @Override
+    //Not a valid comparison, but just to check byte boundaries
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      Preconditions.checkArgument(l2 > 0 && l1 > 0, "l2=" + l2 + ",l1=" + l1);
+      ByteBuffer bb1 = ByteBuffer.wrap(b1, s1, l1);
+      ByteBuffer bb2 = ByteBuffer.wrap(b2, s2, l2);
+      return bb1.compareTo(bb2);
+    }
+  }
+
+  private void merge(List<Path> pathList, int mergeFactor) throws Exception {
+    merge(pathList, mergeFactor, null);
   }
 
   private void merge(int fileCount, int keysPerFile, int mergeFactor) throws Exception {
     List<Path> pathList = createIFiles(fileCount, keysPerFile);
+    merge(pathList, mergeFactor, null);
+  }
 
+  private void merge(List<Path> pathList, int mergeFactor, RawComparator rc) throws Exception {
     //Merge datasets
     TezMerger merger = new TezMerger();
     TezRawKeyValueIterator records = merger.merge(defaultConf, localFs, IntWritable.class,
         LongWritable.class, null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]),
         true, mergeFactor, new Path(workDir, "tmp_" + System.nanoTime()),
-        ConfigUtils.getIntermediateInputKeyComparator(defaultConf), new Reporter(), null, null,
+        ((rc == null) ? comparator : rc), new Reporter(), null, null,
         null,
         new Progress());
 
@@ -134,9 +615,9 @@ public class TestTezMerger {
       if (records.isSameKey()) {
         LOG.info("\tSame Key : key=" + k.get() + ", val=" + v.get());
         //More than one key should be present in the source data
-        Assert.assertTrue(verificationDataSet.get(k.get()).size() > 1);
+        assertTrue(verificationDataSet.get(k.get()).size() > 1);
         //Ensure this is same as the previous key we saw
-        Assert.assertTrue(pk == k.get());
+        assertTrue("previousKey=" + pk + ", current=" + k.get(), pk == k.get());
       } else {
         LOG.info("key=" + k.get() + ", val=" + v.get());
       }
@@ -147,30 +628,30 @@ public class TestTezMerger {
     }
 
     //Verify if the number of distinct entries is the same in source and the test
-    Assert.assertTrue("dataMap=" + dataMap.keySet().size() + ", verificationSet=" +
-        verificationDataSet.keySet().size(),
+    assertTrue("dataMap=" + dataMap.keySet().size() + ", verificationSet=" +
+            verificationDataSet.keySet().size(),
         dataMap.keySet().size() == verificationDataSet.keySet().size());
 
     //Verify with source data
     for (Integer key : verificationDataSet.keySet()) {
-      Assert.assertTrue("Data size for " + key + " not matching with source; dataSize:" + dataMap
+      assertTrue("Data size for " + key + " not matching with source; dataSize:" + dataMap
               .get(key).intValue() + ", source:" + verificationDataSet.get(key).size(),
           dataMap.get(key).intValue() == verificationDataSet.get(key).size());
     }
 
     //Verify if every key has the same number of repeated items in the source dataset as well
     for (Map.Entry<Integer, Integer> entry : dataMap.entrySet()) {
-      Assert.assertTrue(entry.getKey() + "", verificationDataSet.get(entry.getKey()).size() == entry
+      assertTrue(entry.getKey() + "", verificationDataSet.get(entry.getKey()).size() == entry
           .getValue());
     }
 
     LOG.info("******************");
+    verificationDataSet.clear();
   }
 
   private List<Path> createIFiles(int fileCount, int keysPerFile)
       throws IOException {
     List<Path> pathList = Lists.newLinkedList();
-    verificationDataSet.clear();
     Random rnd = new Random();
     for (int i = 0; i < fileCount; i++) {
       int repeatCount = ((i % 2 == 0) && keysPerFile > 0) ? rnd.nextInt(keysPerFile) : 0;
@@ -180,8 +661,10 @@ public class TestTezMerger {
     return pathList;
   }
 
-  static Path writeIFile(int keysPerFile, int repeatCount) throws IOException {
+  static Path writeIFile(int keysPerFile, int repeatCount) throws
+      IOException {
     TreeMultimap<Integer, Long> dataSet = createDataForIFile(keysPerFile, repeatCount);
+    LOG.info("DataSet size : " + dataSet.size());
     Path path = new Path(workDir + "/src", "data_" + System.nanoTime() + ".out");
     FSDataOutputStream out = localFs.create(path);
     //create IFile with RLE
@@ -202,7 +685,7 @@ public class TestTezMerger {
   /**
    * Generate data set for ifile.  Create repeated keys if needed.
    *
-   * @param keyCount approximate number of keys to be created
+   * @param keyCount    approximate number of keys to be created
    * @param repeatCount number of times a key should be repeated
    * @return
    */
@@ -210,9 +693,9 @@ public class TestTezMerger {
     TreeMultimap<Integer, Long> dataSet = TreeMultimap.create();
     Random rnd = new Random();
     for (int i = 0; i < keyCount; i++) {
-      if (repeatCount > 0 && (rnd.nextInt(keyCount) % 2  == 0)) {
+      if (repeatCount > 0 && (rnd.nextInt(keyCount) % 2 == 0)) {
         //repeat this key
-        for(int j = 0; j < repeatCount; j++) {
+        for (int j = 0; j < repeatCount; j++) {
           IntWritable key = new IntWritable(rnd.nextInt(keyCount));
           LongWritable value = new LongWritable(System.nanoTime());
           dataSet.put(key.get(), value.get());
@@ -234,7 +717,6 @@ public class TestTezMerger {
     return dataSet;
   }
 
-
   private static class Reporter implements Progressable {
     @Override
     public void progress() {


[6/8] tez git commit: TEZ-2023. Refactor logIndividualFetchComplete() to be common for both shuffle-schedulers (rbalamohan)

Posted by ss...@apache.org.
TEZ-2023. Refactor logIndividualFetchComplete() to be common for both shuffle-schedulers (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5cf9105f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5cf9105f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5cf9105f

Branch: refs/heads/TEZ-2003
Commit: 5cf9105fe47bb07aa42f5b3132ba13e81fe205a8
Parents: 7096d8a
Author: Rajesh Balamohan <rb...@hortonworks.com>
Authored: Thu Feb 5 08:25:24 2015 +0530
Committer: Rajesh Balamohan <rb...@hortonworks.com>
Committed: Thu Feb 5 08:25:24 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../library/common/shuffle/HttpConnection.java  |  1 +
 .../library/common/shuffle/ShuffleUtils.java    | 39 ++++++++++++++++++++
 .../common/shuffle/impl/ShuffleManager.java     | 19 +---------
 .../orderedgrouped/ShuffleScheduler.java        | 21 ++---------
 5 files changed, 47 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 39d7f81..6a494ca 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2023. Refactor logIndividualFetchComplete() to be common for both shuffle-schedulers.
   TEZ-1895. Vertex reRunning should decrease successfulMembers of VertexGroupInfo.
   TEZ-1999. IndexOutOfBoundsException during merge.
   TEZ-2000. Source vertex exists error during DAG submission.

http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
index 4732a5a..1a5de41 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
@@ -237,6 +237,7 @@ public class HttpConnection {
     }
     // verify that replyHash is HMac of encHash
     SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecretMgr);
+    //Following log statement will be used by tez-tool perf-analyzer for mapping attempt to NM host
     LOG.info("for url=" + url +
       " sent hash and receievd reply " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 629bab8..af02f9e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -24,6 +24,7 @@ import java.io.OutputStream;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.nio.ByteBuffer;
+import java.text.DecimalFormat;
 import java.util.List;
 
 import javax.crypto.SecretKey;
@@ -50,6 +51,14 @@ public class ShuffleUtils {
   private static final Log LOG = LogFactory.getLog(ShuffleUtils.class);
   public static final String SHUFFLE_HANDLER_SERVICE_ID = "mapreduce_shuffle";
 
+  static final ThreadLocal<DecimalFormat> MBPS_FORMAT =
+      new ThreadLocal<DecimalFormat>() {
+        @Override
+        protected DecimalFormat initialValue() {
+          return new DecimalFormat("0.00");
+        }
+      };
+
   public static SecretKey getJobTokenSecretFromTokenBytes(ByteBuffer meta)
       throws IOException {
     DataInputByteBuffer in = new DataInputByteBuffer();
@@ -233,5 +242,35 @@ public class ShuffleUtils {
     sb.append("]");
     return sb.toString();
   }
+
+  /**
+   * Log individual fetch complete event.
+   * This log information would be used by tez-tool/perf-analzyer/shuffle tools for mining
+   * - amount of data transferred between source to destination machine
+   * - time taken to transfer data between source to destination machine
+   * - details on DISK/DISK_DIRECT/MEMORY based shuffles
+   *
+   * @param log
+   * @param millis
+   * @param bytesCompressed
+   * @param bytesDecompressed
+   * @param outputType
+   * @param srcAttemptIdentifier
+   */
+  public static void logIndividualFetchComplete(Log log, long millis, long
+      bytesCompressed,
+      long bytesDecompressed, String outputType, InputAttemptIdentifier srcAttemptIdentifier) {
+    double rate = 0;
+    if (millis != 0) {
+      rate = bytesCompressed / ((double) millis / 1000);
+      rate = rate / (1024 * 1024);
+    }
+    log.info(
+        "Completed fetch for attempt: "
+            + srcAttemptIdentifier + " to " + outputType +
+            ", CompressedSize=" + bytesCompressed + ", DecompressedSize=" + bytesDecompressed +
+            ", EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
+            MBPS_FORMAT.get().format(rate) + " MB/s");
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 13296c7..3dc8156 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -519,8 +519,8 @@ public class ShuffleManager implements FetcherCallback {
         if (!completedInputSet.contains(inputIdentifier)) {
           fetchedInput.commit();
           committed = true;
-          logIndividualFetchComplete(copyDuration, fetchedBytes, decompressedLength, fetchedInput,
-              srcAttemptIdentifier);
+          ShuffleUtils.logIndividualFetchComplete(LOG, copyDuration,
+              fetchedBytes, decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier);
 
           // Processing counters for completed and commit fetches only. Need
           // additional counters for excessive fetches - which primarily comes
@@ -731,22 +731,7 @@ public class ShuffleManager implements FetcherCallback {
         + mbpsFormat.format(transferRate) + " MB/s)");
   }
 
-  private void logIndividualFetchComplete(long millis, long fetchedBytes, long decompressedLength,
-                                          FetchedInput fetchedInput,
-                                          InputAttemptIdentifier srcAttemptIdentifier) {
-    double rate = 0;
-    if (millis != 0) {
-      rate = fetchedBytes / ((double) millis / 1000);
-      rate = rate / (1024 * 1024);
-    }
 
-    LOG.info(
-        "Completed fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType() +
-            ", CompressedSize=" + fetchedBytes + ", DecompressedSize=" + decompressedLength +
-            ",EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
-            mbpsFormat.format(rate) + " MB/s");
-  }
-  
   private class SchedulerFutureCallback implements FutureCallback<Void> {
 
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 066b94a..57e904b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -47,6 +47,7 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type;
 
 import com.google.common.collect.Lists;
@@ -103,7 +104,7 @@ class ShuffleScheduler {
 
   private long totalBytesShuffledTillNow = 0;
   private DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
-  
+
   public ShuffleScheduler(InputContext inputContext,
                           Configuration conf,
                           int numberOfInputs,
@@ -187,8 +188,8 @@ class ShuffleScheduler {
         }
 
         output.commit();
-        logIndividualFetchComplete(millis, bytesCompressed, bytesDecompressed, output,
-            srcAttemptIdentifier);
+        ShuffleUtils.logIndividualFetchComplete(LOG, millis, bytesCompressed,
+            bytesDecompressed, output.getType().toString(), srcAttemptIdentifier);
         if (output.getType() == Type.DISK) {
           bytesShuffledToDisk.increment(bytesCompressed);
         } else if (output.getType() == Type.DISK_DIRECT) {
@@ -234,20 +235,6 @@ class ShuffleScheduler {
     // TODO NEWTEZ Should this be releasing the output, if not committed ? Possible memory leak in case of speculation.
   }
 
-  private void logIndividualFetchComplete(long millis, long bytesCompressed, long bytesDecompressed,
-                                          MapOutput output,
-                                          InputAttemptIdentifier srcAttemptIdentifier) {
-    double rate = 0;
-    if (millis != 0) {
-      rate = bytesCompressed / ((double) millis / 1000);
-      rate = rate / (1024 * 1024);
-    }
-    LOG.info(
-        "Completed fetch for attempt: " + srcAttemptIdentifier + " to " + output.getType() +
-            ", CompressedSize=" + bytesCompressed + ", DecompressedSize=" + bytesDecompressed +
-            ",EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
-            mbpsFormat.format(rate) + " MB/s");
-  }
 
   private void logProgress() {
     double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);


[7/8] tez git commit: TEZ-2020. For 1-1 edge vertex configured event may be sent incorrectly (bikas)

Posted by ss...@apache.org.
TEZ-2020. For 1-1 edge vertex configured event may be sent incorrectly (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b804b8f0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b804b8f0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b804b8f0

Branch: refs/heads/TEZ-2003
Commit: b804b8f03e9d7934202f3838841fe1abb416f480
Parents: 5cf9105
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Feb 4 19:48:19 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Feb 4 19:48:41 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../event/VertexEventOneToOneSourceSplit.java   |  50 ---------
 .../tez/dag/app/dag/event/VertexEventType.java  |   1 -
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 111 +------------------
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  39 +++----
 .../vertexmanager/InputReadyVertexManager.java  |  80 ++++++++-----
 .../TestInputReadyVertexManager.java            |  64 ++++++-----
 7 files changed, 108 insertions(+), 238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b804b8f0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6a494ca..2c54b4b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -176,6 +176,7 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2020. For 1-1 edge vertex configured event may be sent incorrectly
   TEZ-2015. VertexImpl.doneReconfiguringVertex() should check other criteria
   before sending notification
   TEZ-2011. InputReadyVertexManager not resilient to updates in parallelism 

http://git-wip-us.apache.org/repos/asf/tez/blob/b804b8f0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java
deleted file mode 100644
index a7e580e..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.dag.event;
-
-import org.apache.tez.dag.records.TezVertexID;
-
-public class VertexEventOneToOneSourceSplit extends VertexEvent {
-  final int numTasks;
-  final TezVertexID originalSplitVertex;
-  final TezVertexID senderVertex;
-  
-  public VertexEventOneToOneSourceSplit(TezVertexID vertexId,
-      TezVertexID senderVertex,
-      TezVertexID originalSplitVertex,
-      int numTasks) {
-    super(vertexId, VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT);
-    this.numTasks = numTasks;
-    this.senderVertex = senderVertex;
-    this.originalSplitVertex = originalSplitVertex;
-  }
-  
-  public int getNumTasks() {
-    return numTasks;
-  }
-  
-  public TezVertexID getOriginalSplitSource() {
-    return originalSplitVertex;
-  }
-  
-  public TezVertexID getSenderVertex() {
-    return senderVertex;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/b804b8f0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index 1d0222e..5eb4929 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -45,7 +45,6 @@ public enum VertexEventType {
   V_MANAGER_USER_CODE_ERROR,
   
   V_ROUTE_EVENT,
-  V_ONE_TO_ONE_SOURCE_SPLIT,
   
   //Producer: VertexInputInitializer
   V_ROOT_INPUT_INITIALIZED,

http://git-wip-us.apache.org/repos/asf/tez/blob/b804b8f0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 9deccd2..865b182 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -116,7 +116,6 @@ import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
 import org.apache.tez.dag.app.dag.event.VertexEventNullEdgeInitialized;
-import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
 import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
@@ -343,10 +342,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               VertexEventType.V_ROOT_INPUT_INITIALIZED,
               new RootInputInitializedTransition())
           .addTransition(VertexState.INITIALIZING,
-              EnumSet.of(VertexState.INITIALIZING),
-              VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
-              new OneToOneSourceSplitTransition())
-          .addTransition(VertexState.INITIALIZING,
               EnumSet.of(VertexState.INITED, VertexState.FAILED),
               VertexEventType.V_READY_TO_INIT,
               new VertexInitializedTransition())
@@ -400,10 +395,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               new SourceVertexStartedTransition())
           .addTransition(VertexState.INITED,
               EnumSet.of(VertexState.INITED),
-              VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
-              new OneToOneSourceSplitTransition())
-          .addTransition(VertexState.INITED,
-              EnumSet.of(VertexState.INITED),
               VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
               SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
           .addTransition(VertexState.INITED,
@@ -443,10 +434,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexState.ERROR),
               VertexEventType.V_TASK_COMPLETED,
               new TaskCompletedTransition())
-          .addTransition(VertexState.RUNNING,
-              EnumSet.of(VertexState.RUNNING),
-              VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
-              new OneToOneSourceSplitTransition())
           .addTransition(VertexState.RUNNING, VertexState.TERMINATING,
               VertexEventType.V_TERMINATE,
               new VertexKilledTransition())
@@ -546,7 +533,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_ROUTE_EVENT,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_COMPLETED,
-                  VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
                   VertexEventType.V_ROOT_INPUT_INITIALIZED,
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_NULL_EDGE_INITIALIZED,
@@ -569,7 +555,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_ROUTE_EVENT,
                   VertexEventType.V_TASK_RESCHEDULED,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
-                  VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_COMPLETED,
                   VertexEventType.V_ROOT_INPUT_INITIALIZED,
@@ -590,7 +575,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_MANAGER_USER_CODE_ERROR,
                   VertexEventType.V_TASK_COMPLETED,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
-                  VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_RESCHEDULED,
                   VertexEventType.V_INTERNAL_ERROR,
@@ -692,7 +676,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private final UserGroupInformation dagUgi;
 
   private boolean parallelismSet = false;
-  private TezVertexID originalOneToOneSplitSource = null;
 
   private AtomicBoolean committed = new AtomicBoolean(false);
   private AtomicBoolean aborted = new AtomicBoolean(false);
@@ -1497,21 +1480,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         }
       }
 
-      for (Map.Entry<Vertex, Edge> entry : targetVertices.entrySet()) {
-        Edge edge = entry.getValue();
-        if (edge.getEdgeProperty().getDataMovementType()
-            == DataMovementType.ONE_TO_ONE) {
-          // inform these target vertices that we have changed parallelism
-          VertexEventOneToOneSourceSplit event =
-              new VertexEventOneToOneSourceSplit(entry.getKey().getVertexId(),
-                  getVertexId(),
-                  ((originalOneToOneSplitSource!=null) ?
-                      originalOneToOneSplitSource : getVertexId()),
-                  numTasks);
-          getEventHandler().handle(event);
-        }
-      }
-
     } finally {
       writeLock.unlock();
     }
@@ -1532,21 +1500,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   
   @Override
   public void vertexReconfigurationPlanned() {
-    vertexReconfigurationPlanned(false);
-  }
-  
-  public void vertexReconfigurationPlanned(boolean testOverride) {
     writeLock.lock();
     try {
-      if (testOverride) {
-        Preconditions.checkState(vmIsInitialized.get() && completelyConfiguredSent.get(),
-            "test should override only failed cases");
-      } else {
-        Preconditions.checkState(!vmIsInitialized.get(),
-            "context.vertexReconfigurationPlanned() cannot be called after initialize()");
-        Preconditions.checkState(!completelyConfiguredSent.get(), "vertexReconfigurationPlanned() "
-            + " cannot be invoked after the vertex has been configured.");
-      }
+      Preconditions.checkState(!vmIsInitialized.get(),
+          "context.vertexReconfigurationPlanned() cannot be called after initialize()");
+      Preconditions.checkState(!completelyConfiguredSent.get(), "vertexReconfigurationPlanned() "
+          + " cannot be invoked after the vertex has been configured.");
       this.vertexToBeReconfiguredByManager = true;
     } finally {
       writeLock.unlock();
@@ -3147,68 +3106,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
-  public static class OneToOneSourceSplitTransition implements
-    MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
-
-    @Override
-    public VertexState transition(VertexImpl vertex, VertexEvent event) {
-      VertexEventOneToOneSourceSplit splitEvent =
-          (VertexEventOneToOneSourceSplit)event;
-      TezVertexID originalSplitSource = splitEvent.getOriginalSplitSource();
-
-      if (vertex.originalOneToOneSplitSource != null) {
-        VertexState state = vertex.getState();
-        Preconditions
-            .checkState(
-                (state == VertexState.INITIALIZING
-                    || state == VertexState.INITED || state == VertexState.RUNNING),
-                " Unexpected 1-1 split for vertex " + vertex.getLogIdentifier()
-                    + " in state " + vertex.getState() + " . Split in vertex "
-                    + originalSplitSource + " sent by vertex "
-                    + splitEvent.getSenderVertex() + " numTasks "
-                    + splitEvent.getNumTasks());
-        if (vertex.originalOneToOneSplitSource.equals(originalSplitSource)) {
-          // ignore another split event that may have come from a different
-          // path in the DAG. We have already split because of that source
-          LOG.info("Ignoring split of vertex " + vertex.getLogIdentifier() +
-              " because of split in vertex " + originalSplitSource +
-              " sent by vertex " + splitEvent.getSenderVertex() +
-              " numTasks " + splitEvent.getNumTasks());
-          return state;
-        }
-        // cannot split from multiple sources
-        throw new TezUncheckedException("Vertex: " + vertex.getLogIdentifier() +
-            " asked to split by: " + originalSplitSource +
-            " but was already split by:" + vertex.originalOneToOneSplitSource);
-      }
-
-      LOG.info("Splitting vertex " + vertex.getLogIdentifier() +
-          " because of split in vertex " + originalSplitSource +
-          " sent by vertex " + splitEvent.getSenderVertex() +
-          " numTasks " + splitEvent.getNumTasks());
-      vertex.originalOneToOneSplitSource = originalSplitSource;
-      try {
-        vertex.setParallelism(splitEvent.getNumTasks(), null, null, null, false);
-      } catch (Exception e) {
-        // ingore this exception, should not happen
-        LOG.error("Unexpected exception, Just set Parallelims to a specified value, not involve EdgeManager,"
-            + "exception should not happen here", e);
-      }
-      if (vertex.getState() == VertexState.RUNNING ||
-          vertex.getState() == VertexState.INITED) {
-        return vertex.getState();
-      } else {
-        Preconditions.checkState(vertex.getState() == VertexState.INITIALIZING,
-            " Unexpected 1-1 split for vertex " + vertex.getLogIdentifier() +
-                " in state " + vertex.getState() +
-                " . Split in vertex " + originalSplitSource +
-                " sent by vertex " + splitEvent.getSenderVertex() +
-                " numTasks " + splitEvent.getNumTasks());
-        return vertex.getState();
-      }
-    }
-  }
-
   // Temporary to maintain topological order while starting vertices. Not useful
   // since there's not much difference between the INIT and RUNNING states.
   public static class SourceVertexStartedTransition implements

http://git-wip-us.apache.org/repos/asf/tez/blob/b804b8f0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index e94bb17..83a3a8a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -2447,14 +2447,14 @@ public class TestVertexImpl {
   
   @Test(timeout = 5000)
   public void testVertexSetParallelism() throws Exception {
-    initAllVertices(VertexState.INITED);
     VertexImpl v3 = vertices.get("vertex3");
+    v3.vertexReconfigurationPlanned();
+    initAllVertices(VertexState.INITED);
     Assert.assertEquals(2, v3.getTotalTasks());
     Map<TezTaskID, Task> tasks = v3.getTasks();
     Assert.assertEquals(2, tasks.size());
     TezTaskID firstTask = tasks.keySet().iterator().next();
 
-    v3.vertexReconfigurationPlanned(true);
     VertexImpl v1 = vertices.get("vertex1");
     startVertex(vertices.get("vertex2"));
     startVertex(v1);
@@ -2477,13 +2477,13 @@ public class TestVertexImpl {
 
   @Test(timeout = 5000)
   public void testVertexSetParallelismIncreaseException() throws Exception {
-    initAllVertices(VertexState.INITED);
     VertexImpl v3 = vertices.get("vertex3");
+    v3.vertexReconfigurationPlanned();
+    initAllVertices(VertexState.INITED);
     Assert.assertEquals(2, v3.getTotalTasks());
     Map<TezTaskID, Task> tasks = v3.getTasks();
     Assert.assertEquals(2, tasks.size());
 
-    v3.vertexReconfigurationPlanned(true);
     VertexImpl v1 = vertices.get("vertex1");
     startVertex(vertices.get("vertex2"));
     startVertex(v1);
@@ -2500,13 +2500,13 @@ public class TestVertexImpl {
   
   @Test(timeout = 5000)
   public void testVertexSetParallelismMultipleException() throws Exception {
-    initAllVertices(VertexState.INITED);
     VertexImpl v3 = vertices.get("vertex3");
+    v3.vertexReconfigurationPlanned();
+    initAllVertices(VertexState.INITED);
     Assert.assertEquals(2, v3.getTotalTasks());
     Map<TezTaskID, Task> tasks = v3.getTasks();
     Assert.assertEquals(2, tasks.size());
 
-    v3.vertexReconfigurationPlanned(true);
     VertexImpl v1 = vertices.get("vertex1");
     startVertex(vertices.get("vertex2"));
     startVertex(v1);
@@ -2561,6 +2561,8 @@ public class TestVertexImpl {
 
   @Test(timeout = 5000)
   public void testSetCustomEdgeManager() throws Exception {
+    VertexImpl v5 = vertices.get("vertex5"); // Vertex5 linked to v3 (v3 src, v5 dest)
+    v5.vertexReconfigurationPlanned();
     initAllVertices(VertexState.INITED);
     Edge edge = edges.get("e4");
     EdgeManagerPlugin em = edge.getEdgeManager();
@@ -2574,10 +2576,7 @@ public class TestVertexImpl {
     edgeManagerDescriptor.setUserPayload(userPayload);
 
     Vertex v3 = vertices.get("vertex3");
-    VertexImpl v5 = vertices.get("vertex5"); // Vertex5 linked to v3 (v3 src, v5
-                                         // dest)
 
-    v5.vertexReconfigurationPlanned(true);
     Map<String, EdgeManagerPluginDescriptor> edgeManagerDescriptors =
         Collections.singletonMap(v3.getName(), edgeManagerDescriptor);
     v5.setParallelism(v5.getTotalTasks() - 1, null, edgeManagerDescriptors, null, true); // Must decrease.
@@ -3396,25 +3395,23 @@ public class TestVertexImpl {
       Assert.assertEquals(v1Hints.get(i), v1.getTaskLocationHints()[i]);
     }
     Assert.assertEquals(true, initializerManager1.hasShutDown);
-    
+
+    startVertex(v1);
+
     Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks());
-    Assert.assertEquals(VertexState.INITED, vertices.get("vertex2").getState());
     Assert.assertEquals(numTasks, vertices.get("vertex3").getTotalTasks());
-    Assert.assertEquals(VertexState.INITED, vertices.get("vertex3").getState());
     Assert.assertEquals(numTasks, vertices.get("vertex4").getTotalTasks());
-    Assert.assertEquals(VertexState.INITED, vertices.get("vertex5").getState());
-    // v5, v6 still initializing since edge is null
-    Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex4").getState());
+    // v4, v6 still initializing since edge is null
     Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex4").getState());
+    Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex6").getState());
     
-    startVertex(v1);
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex1").getState());
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex2").getState());
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex3").getState());
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex5").getState());
-    // v5, v6 still initializing since edge is null
-    Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex4").getState());
+    // v4, v6 still initializing since edge is null
     Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex4").getState());
+    Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex6").getState());
     
     mockEdgeManagerDescriptor =
         EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName());
@@ -3423,7 +3420,7 @@ public class TestVertexImpl {
     e.setCustomEdgeManager(mockEdgeManagerDescriptor);
     dispatcher.await();
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
-    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
+    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex6").getState());
   }
   
   @Test(timeout = 5000)
@@ -3434,6 +3431,7 @@ public class TestVertexImpl {
     dagPlan = createDAGPlanForOneToOneSplit(null, numTasks, false);
     setupPostDagCreation();
     VertexImpl v1 = vertices.get("vertex1");
+    v1.vertexReconfigurationPlanned();
     initAllVertices(VertexState.INITED);
     
     // fudge vertex manager so that tasks dont start running
@@ -3441,7 +3439,6 @@ public class TestVertexImpl {
         VertexManagerPluginDescriptor.create(VertexManagerPluginForTest.class.getName()),
         v1, appContext, mock(StateChangeNotifier.class));
     v1.vertexManager.initialize();
-    v1.vertexReconfigurationPlanned(true);
     startVertex(v1);
     
     Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks());
@@ -3473,6 +3470,7 @@ public class TestVertexImpl {
     dagPlan = createDAGPlanForOneToOneSplit(null, numTasks, false);
     setupPostDagCreation();
     VertexImpl v1 = vertices.get("vertex1");
+    v1.vertexReconfigurationPlanned();
     initAllVertices(VertexState.INITED);
     
     // fudge vertex manager so that tasks dont start running
@@ -3486,7 +3484,6 @@ public class TestVertexImpl {
     Assert.assertEquals(numTasks, vertices.get("vertex4").getTotalTasks());
     // change parallelism
     int newNumTasks = 3;
-    v1.vertexReconfigurationPlanned(true);
     v1.setParallelism(newNumTasks, null, null, null, true);
     v1.doneReconfiguringVertex();
     dispatcher.await();

http://git-wip-us.apache.org/repos/asf/tez/blob/b804b8f0/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
index f5c187e..e2e9dd3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -55,8 +56,10 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
   int oneToOneSrcTasksDoneCount[];
   TaskLocationHint oneToOneLocationHints[];
   int numOneToOneEdges;
-  int numSignalsToWaitFor;
+  int numConfiguredSources;
   Multimap<String, Integer> pendingCompletions = LinkedListMultimap.create();
+  AtomicBoolean configured;
+  AtomicBoolean started;
 
   public InputReadyVertexManager(VertexManagerPluginContext context) {
     super(context);
@@ -76,13 +79,10 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
     }
   }
   
-  void start() {
-    if (!ready()) {
-      return;
-    }
+  private void configure() {
+    Preconditions.checkState(!configured.get(), "Vertex: " + getContext().getVertexName());
     int numManagedTasks = getContext().getVertexNumTasks(getContext().getVertexName());
     LOG.info("Managing " + numManagedTasks + " tasks for vertex: " + getContext().getVertexName());
-    taskIsStarted = new boolean[numManagedTasks];
 
     // find out about all input edge types. If there is a custom edge then 
     // TODO Until TEZ-1013 we cannot handle custom input formats
@@ -116,32 +116,51 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
     }
     
     if (numOneToOneEdges > 0) {
+      Preconditions
+          .checkState(oneToOneSrcTaskCount >= 0, "Vertex: " + getContext().getVertexName());
       if (oneToOneSrcTaskCount != numManagedTasks) {
-        throw new TezUncheckedException(
-            "Managed task number must equal 1-1 source task number");
+        numManagedTasks = oneToOneSrcTaskCount;
+        // must change parallelism to make them the same
+        LOG.info("Update parallelism of vertex: " + getContext().getVertexName() + 
+            " to " + oneToOneSrcTaskCount + " to match source 1-1 vertices.");
+        getContext().setVertexParallelism(oneToOneSrcTaskCount, null, null, null);
       }
       oneToOneSrcTasksDoneCount = new int[oneToOneSrcTaskCount];
       oneToOneLocationHints = new TaskLocationHint[oneToOneSrcTaskCount];
     }
+    
+    Preconditions.checkState(numManagedTasks >=0, "Vertex: " + getContext().getVertexName());
+    taskIsStarted = new boolean[numManagedTasks];
 
-    for (Map.Entry<String, Collection<Integer>> entry :  pendingCompletions.asMap().entrySet()) {
-      for (Integer task : entry.getValue()) {
-        handleSourceTaskFinished(entry.getKey(), task);
-      }
-    }
+    // allow scheduling
+    configured.set(true);
+    getContext().doneReconfiguringVertex();
+    trySchedulingPendingCompletions();
   }
   
-  boolean ready() {
-    int target = getContext().getInputVertexEdgeProperties().size() + 1;
-    Preconditions.checkState(numSignalsToWaitFor <= target);
-    return (numSignalsToWaitFor == target);
+  private boolean readyToSchedule() {
+    return (configured.get() && started.get());
+  }
+  
+  private void trySchedulingPendingCompletions() {
+    if (readyToSchedule() && !pendingCompletions.isEmpty()) {
+      for (Map.Entry<String, Collection<Integer>> entry : pendingCompletions.asMap().entrySet()) {
+        for (Integer i : entry.getValue()) {
+          onSourceTaskCompleted(entry.getKey(), i);
+        }
+      }
+    }
   }
   
   @Override
   public void initialize() {
+    // this will prevent vertex from starting until we notify we are done
+    getContext().vertexReconfigurationPlanned();
     Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
     // wait for sources and self to start
-    numSignalsToWaitFor = 0;
+    numConfiguredSources = 0;
+    configured = new AtomicBoolean(false);
+    started = new AtomicBoolean(false);
     for (String entry : edges.keySet()) {
       getContext().registerForVertexStateUpdates(entry, EnumSet.of(VertexState.CONFIGURED));
     }
@@ -149,24 +168,33 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
   
   @Override
   public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception {
-    numSignalsToWaitFor++;
-    LOG.info("Received configured signal from: " + stateUpdate.getVertexName() + 
-        " numConfiguredSources: " + numSignalsToWaitFor);
-    start();
+    numConfiguredSources++;
+    int target = getContext().getInputVertexEdgeProperties().size();
+    LOG.info("For vertex: " + getContext().getVertexName() + "Received configured signal from: "
+        + stateUpdate.getVertexName() + " numConfiguredSources: " + numConfiguredSources
+        + " needed: " + target);
+    Preconditions.checkState(numConfiguredSources <= target, "Vertex: " + getContext().getVertexName());
+    if (numConfiguredSources == target) {
+      configure();
+    }
   }
 
   @Override
   public synchronized void onVertexStarted(Map<String, List<Integer>> completions) {
     for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
       pendingCompletions.putAll(entry.getKey(), entry.getValue());
-    }
-    numSignalsToWaitFor++;
-    start();
+    }    
+
+    // allow scheduling
+    started.set(true);
+    
+    trySchedulingPendingCompletions();
   }
 
   @Override
   public synchronized void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
-    if (ready()) {
+    if (readyToSchedule()) {
+      // configured and started. try to schedule
       handleSourceTaskFinished(srcVertexName, taskId);
     } else {
       pendingCompletions.put(srcVertexName, taskId);

http://git-wip-us.apache.org/repos/asf/tez/blob/b804b8f0/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
index 9a83a51..8de747d 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
@@ -31,6 +31,7 @@ import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
@@ -82,8 +83,11 @@ public class TestInputReadyVertexManager {
     
     InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
-    // first source vertex configured
+    verify(mockContext, times(1)).vertexReconfigurationPlanned();
+    // source vertex configured
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    verify(mockContext, times(1)).doneReconfiguringVertex();
+    verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
     // then own vertex started
     manager.onVertexStarted(initialCompletions);
     manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
@@ -119,10 +123,12 @@ public class TestInputReadyVertexManager {
     
     InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
-    // first own vertex started
-    manager.onVertexStarted(initialCompletions);
-    // then source vertex configured
+    verify(mockContext, times(1)).vertexReconfigurationPlanned();
+    // source vertex configured
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    verify(mockContext, times(1)).doneReconfiguringVertex();
+    verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
+    manager.onVertexStarted(initialCompletions);
     verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
     Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex().intValue());
@@ -174,17 +180,19 @@ public class TestInputReadyVertexManager {
     
     InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
-    // first own vertex started
-    manager.onVertexStarted(initialCompletions);
+    verify(mockContext, times(1)).vertexReconfigurationPlanned();
     verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
+    // ok to have source task complete before anything else
     manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
+    // first own vertex started
+    manager.onVertexStarted(initialCompletions);
+    // no scheduling as we are not configured yet
     verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
     // then source vertex configured. now we start
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    verify(mockContext, times(1)).doneReconfiguringVertex();
+    
     verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
-    Assert.assertEquals(2, requestCaptor.getAllValues().size());
-    Assert.assertEquals(1, requestCaptor.getValue().size());
-    Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex().intValue());
     manager.onSourceTaskCompleted(mockSrcVertexId1, 2);
     verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
@@ -247,58 +255,48 @@ public class TestInputReadyVertexManager {
     
     Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
     
-    // 1-1 sources do not match managed tasks before vertex started
+    // 1-1 sources do not match managed tasks. setParallelism called to make them match
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
     InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
+    verify(mockContext, times(1)).vertexReconfigurationPlanned();
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
-    try {
-      manager.onVertexStarted(initialCompletions);
-      Assert.assertTrue("Should have exception", false);
-    } catch (TezUncheckedException e) {
-      e.getMessage().contains("Managed task number must equal 1-1 source");
-    }
-
-    // 1-1 sources do not match managed tasks after vertex started
-    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
-    manager = new InputReadyVertexManager(mockContext);
-    manager.initialize();
-    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
-    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+    verify(mockContext, times(1)).setVertexParallelism(3, null, null, null);
+    verify(mockContext, times(1)).doneReconfiguringVertex();
     manager.onVertexStarted(initialCompletions);
-    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
-    try {
-      manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
-      Assert.assertTrue("Should have exception", false);
-    } catch (TezUncheckedException e) {
-      e.getMessage().contains("Managed task number must equal 1-1 source");
-    }
     
     // 1-1 sources do not match
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
     when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(4);
     manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
+    verify(mockContext, times(2)).vertexReconfigurationPlanned();
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
-    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     try {
-      manager.onVertexStarted(initialCompletions);
+      manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
       Assert.assertTrue("Should have exception", false);
     } catch (TezUncheckedException e) {
       e.getMessage().contains("1-1 source vertices must have identical concurrency");
     }
+    verify(mockContext, times(1)).setVertexParallelism(anyInt(), (VertexLocationHint) any(),
+        anyMap(), anyMap()); // not invoked
+    
+    when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3);
     
     initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
     initialCompletions.put(mockSrcVertexId2, Collections.singletonList(0));
-    when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3);
     manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
+    verify(mockContext, times(3)).vertexReconfigurationPlanned();
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+    verify(mockContext, times(1)).setVertexParallelism(anyInt(), (VertexLocationHint) any(),
+        anyMap(), anyMap()); // not invoked
+    verify(mockContext, times(2)).doneReconfiguringVertex();
     manager.onVertexStarted(initialCompletions);
     // all 1-1 0's done but not scheduled because v1 is not done
     manager.onSourceTaskCompleted(mockSrcVertexId3, 0);


[8/8] tez git commit: TEZ-2019. Temporarily allow the scheduler and launcher to be specified via configuration. (sseth)

Posted by ss...@apache.org.
TEZ-2019. Temporarily allow the scheduler and launcher to be specified
via configuration. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f49c054c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f49c054c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f49c054c

Branch: refs/heads/TEZ-2003
Commit: f49c054c95e0d3ff6d72e681da526aa95b64b300
Parents: b804b8f
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Jan 30 16:02:32 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Feb 5 12:59:53 2015 -0800

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |  4 +++
 .../apache/tez/dag/api/TezConfiguration.java    |  2 ++
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 30 ++++++++++++++++-
 .../dag/app/rm/TaskSchedulerEventHandler.java   | 35 ++++++++++++++++++--
 .../org/apache/tez/runtime/task/TezChild.java   |  3 +-
 5 files changed, 70 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f49c054c/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
new file mode 100644
index 0000000..1822fcb
--- /dev/null
+++ b/TEZ-2003-CHANGES.txt
@@ -0,0 +1,4 @@
+ALL CHANGES:
+  TEZ-2019. Temporarily allow the scheduler and launcher to be specified via configuration.
+
+INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/f49c054c/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 43307cb..d85e19a 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -972,4 +972,6 @@ public class TezConfiguration extends Configuration {
       + "allow.disabled.timeline-domains";
   public static final boolean TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT = false;
 
+  public static final String TEZ_AM_CONTAINER_LAUNCHER_CLASS = TEZ_AM_PREFIX + "container-launcher.class";
+  public static final String TEZ_AM_TASK_SCHEDULER_CLASS = TEZ_AM_PREFIX + "task-scheduler.class";
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f49c054c/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index e1ab3b7..ac0e960 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -27,6 +27,8 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
@@ -89,6 +91,7 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.AsyncDispatcher;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezConverterUtils;
 import org.apache.tez.common.TezUtilsInternal;
@@ -917,9 +920,34 @@ public class DAGAppMaster extends AbstractService {
   protected ContainerLauncher
       createContainerLauncher(final AppContext context) throws UnknownHostException {
     if(isLocal){
+      LOG.info("Creating LocalContainerLauncher");
       return new LocalContainerLauncher(context, taskAttemptListener, workingDirectory);
     } else {
-      return new ContainerLauncherImpl(context);
+      // TODO: Temporary reflection with specific parameters until a clean interface is defined.
+      String containerLauncherClassName = getConfig().get(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHER_CLASS);
+      if (containerLauncherClassName == null) {
+        LOG.info("Creating Default Container Launcher");
+        return new ContainerLauncherImpl(context);
+      } else {
+        LOG.info("Creating container launcher : " + containerLauncherClassName);
+        Class<? extends ContainerLauncher> containerLauncherClazz = (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz(
+            containerLauncherClassName);
+        try {
+          Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
+              .getConstructor(AppContext.class, Configuration.class, TaskAttemptListener.class);
+          ctor.setAccessible(true);
+          ContainerLauncher instance = ctor.newInstance(context, getConfig(), taskAttemptListener);
+          return instance;
+        } catch (NoSuchMethodException e) {
+          throw new TezUncheckedException(e);
+        } catch (InvocationTargetException e) {
+          throw new TezUncheckedException(e);
+        } catch (InstantiationException e) {
+          throw new TezUncheckedException(e);
+        } catch (IllegalAccessException e) {
+          throw new TezUncheckedException(e);
+        }
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f49c054c/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 625b09e..03037fa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.dag.app.rm;
 
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -28,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -40,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.TaskLocationHint;
@@ -315,12 +319,39 @@ public class TaskSchedulerEventHandler extends AbstractService
     boolean isLocal = getConfig().getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
         TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
     if (isLocal) {
+      LOG.info("Using TaskScheduler: LocalTaskSchedulerService");
       return new LocalTaskSchedulerService(this, this.containerSignatureMatcher,
           host, port, trackingUrl, appContext);
     }
     else {
-      return new YarnTaskSchedulerService(this, this.containerSignatureMatcher,
-          host, port, trackingUrl, appContext);
+      String schedulerClassName = getConfig().get(TezConfiguration.TEZ_AM_TASK_SCHEDULER_CLASS);
+      if (schedulerClassName == null) {
+        LOG.info("Using TaskScheduler: YarnTaskSchedulerService");
+        return new YarnTaskSchedulerService(this, this.containerSignatureMatcher,
+            host, port, trackingUrl, appContext);
+      } else {
+        LOG.info("Using custom TaskScheduler: " + schedulerClassName);
+        // TODO Temporary reflection with specific parameters. Remove once there is a clean interface.
+        Class<? extends TaskSchedulerService> taskSchedulerClazz =
+            (Class<? extends TaskSchedulerService>) ReflectionUtils.getClazz(schedulerClassName);
+        try {
+          Constructor<? extends TaskSchedulerService> ctor = taskSchedulerClazz
+              .getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class,
+                  Integer.class, String.class, Configuration.class);
+          ctor.setAccessible(true);
+          TaskSchedulerService taskSchedulerService =
+              ctor.newInstance(this, appContext, host, port, trackingUrl, getConfig());
+          return taskSchedulerService;
+        } catch (NoSuchMethodException e) {
+          throw new TezUncheckedException(e);
+        } catch (InvocationTargetException e) {
+          throw new TezUncheckedException(e);
+        } catch (InstantiationException e) {
+          throw new TezUncheckedException(e);
+        } catch (IllegalAccessException e) {
+          throw new TezUncheckedException(e);
+        }
+      }
     }
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/f49c054c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index d537846..6164e52 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -356,7 +356,8 @@ public class TezChild {
       DefaultMetricsSystem.shutdown();
       if (!isLocal) {
         RPC.stopProxy(umbilical);
-        LogManager.shutdown();
+        // TODO Temporary change. Revert. Ideally, move this over to the main method in TezChild if possible.
+//        LogManager.shutdown();
       }
     }
   }


[5/8] tez git commit: TEZ-2015. VertexImpl.doneReconfiguringVertex() should check other criteria before sending notification (bikas)

Posted by ss...@apache.org.
TEZ-2015. VertexImpl.doneReconfiguringVertex() should check other criteria before sending notification (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7096d8a8
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7096d8a8
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7096d8a8

Branch: refs/heads/TEZ-2003
Commit: 7096d8a8f409e3db4f568230a959432e2c9cbb78
Parents: cfa637a
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Feb 4 11:10:23 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Feb 4 11:10:23 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 12 ++--
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 72 ++++++++++++++++++++
 3 files changed, 80 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/7096d8a8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 24c5b32..39d7f81 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -175,6 +175,8 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2015. VertexImpl.doneReconfiguringVertex() should check other criteria
+  before sending notification
   TEZ-2011. InputReadyVertexManager not resilient to updates in parallelism 
   TEZ-1934. TestAMRecovery may fail due to the execution order is not determined.
   TEZ-1642. TestAMRecovery sometimes fail.

http://git-wip-us.apache.org/repos/asf/tez/blob/7096d8a8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index c3f4ae7..9deccd2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1560,11 +1560,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       Preconditions.checkState(vertexToBeReconfiguredByManager, "doneReconfiguringVertex() can be "
           + "invoked only after vertexReconfigurationPlanned() is invoked");
       this.vertexToBeReconfiguredByManager = false;
-      // TEZ-2015 VM may not have configured everything eg. input edge. maybeSendConfiguredEvent()
-      if (completelyConfiguredSent.compareAndSet(false, true)) {
-        // vertex already started and at that time this event was not sent. Send now.
-        stateChangeNotifier.stateChanged(vertexId, new VertexStateUpdate(vertexName,
-            org.apache.tez.dag.api.event.VertexState.CONFIGURED));
+      if (canInitVertex()) {
+        maybeSendConfiguredEvent();
+      } else {
+        Preconditions.checkState(getInternalState() == VertexState.INITIALIZING, "Vertex: "
+            + getLogIdentifier());
       }
     } finally {
       writeLock.unlock();
@@ -3302,7 +3302,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private void maybeSendConfiguredEvent() {
     // the vertex is fully configured by the time it starts. Always notify completely configured
     // unless the vertex manager has told us that it is going to reconfigure it further
-    Preconditions.checkState(canInitVertex());
+    Preconditions.checkState(canInitVertex(), "Vertex: " + getLogIdentifier());
     if (!this.vertexToBeReconfiguredByManager) {
       // this vertex will not be reconfigured by its manager
       if (completelyConfiguredSent.compareAndSet(false, true)) {

http://git-wip-us.apache.org/repos/asf/tez/blob/7096d8a8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 645708e..e94bb17 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -4949,6 +4949,78 @@ public class TestVertexImpl {
     Assert.assertNotNull(vB.getTask(0));
     Assert.assertNotNull(vC.getTask(0));
   }
+  
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testVertexConfiguredDoneByVMBeforeEdgeDefined() throws Exception {
+    // Race when a source vertex manages to start before the target vertex has
+    // been initialized
+    setupPreDagCreation();
+    dagPlan = createSamplerDAGPlan(true);
+    setupPostDagCreation();
+    
+    VertexImpl vA = vertices.get("A");
+    VertexImpl vB = vertices.get("B");
+    VertexImpl vC = vertices.get("C");
+
+    TestUpdateListener listener = new TestUpdateListener();
+    updateTracker
+        .registerForVertexUpdates(vB.getName(),
+            EnumSet.of(org.apache.tez.dag.api.event.VertexState.CONFIGURED),
+            listener);
+
+    // fudge the vm so we can do custom stuff
+    vB.vertexManager = new VertexManager(
+        VertexManagerPluginDescriptor.create(VertexManagerPluginForTest.class.getName()),
+        vB, appContext, mock(StateChangeNotifier.class));
+    
+    vB.vertexReconfigurationPlanned();
+    
+    dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(),
+      VertexEventType.V_INIT));
+    dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(),
+      VertexEventType.V_START));
+
+    dispatcher.await();
+    Assert.assertEquals(VertexState.INITIALIZING, vA.getState());
+    Assert.assertEquals(VertexState.INITIALIZING, vB.getState());
+    Assert.assertEquals(VertexState.INITIALIZING, vC.getState());
+    
+    // setting the edge manager should vA to start
+    EdgeManagerPluginDescriptor mockEdgeManagerDescriptor =
+        EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName());
+    Edge e = vC.sourceVertices.get(vA);
+    Assert.assertNull(e.getEdgeManager());
+    e.setCustomEdgeManager(mockEdgeManagerDescriptor);
+    dispatcher.await();
+    Assert.assertEquals(VertexState.RUNNING, vA.getState());
+    Assert.assertEquals(VertexState.INITIALIZING, vB.getState());
+    Assert.assertEquals(VertexState.INITIALIZING, vC.getState());
+    
+    // vB is not configured yet. Edge to C is not configured. So it should not send configured event
+    // even thought VM says its doneConfiguring vertex
+    vB.doneReconfiguringVertex();
+    Assert.assertEquals(0, listener.events.size());
+    
+    // complete configuration and verify getting configured signal from vB
+    Map<String, EdgeManagerPluginDescriptor> edges = Maps.newHashMap();
+    edges.put("B", mockEdgeManagerDescriptor);
+    vC.setParallelism(2, vertexLocationHint, edges, null, true);
+
+    dispatcher.await();
+    Assert.assertEquals(1, listener.events.size());
+    Assert.assertEquals(vB.getName(), listener.events.get(0).getVertexName());
+    Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.CONFIGURED,
+        listener.events.get(0).getVertexState());
+    updateTracker.unregisterForVertexUpdates(vB.getName(), listener);
+    
+    Assert.assertEquals(VertexState.RUNNING, vA.getState());
+    Assert.assertEquals(VertexState.RUNNING, vB.getState());
+    Assert.assertEquals(VertexState.RUNNING, vC.getState());
+    Assert.assertNotNull(vA.getTask(0));
+    Assert.assertNotNull(vB.getTask(0));
+    Assert.assertNotNull(vC.getTask(0));
+  }
 
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)


[2/8] tez git commit: TEZ-2011. InputReadyVertexManager not resilient to updates in parallelism (bikas)

Posted by ss...@apache.org.
TEZ-2011. InputReadyVertexManager not resilient to updates in parallelism (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b7268699
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b7268699
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b7268699

Branch: refs/heads/TEZ-2003
Commit: b7268699e684fc4d1ebc00b47d8f1f7e4163bf97
Parents: 6ba1339
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Jan 30 17:46:27 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Jan 30 17:46:27 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  1 +
 .../vertexmanager/InputReadyVertexManager.java  | 66 ++++++++++++---
 .../TestInputReadyVertexManager.java            | 87 +++++++++++++++++++-
 4 files changed, 140 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b7268699/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 43d009d..5c0bec0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -173,6 +173,7 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2011. InputReadyVertexManager not resilient to updates in parallelism 
   TEZ-1934. TestAMRecovery may fail due to the execution order is not determined.
   TEZ-1642. TestAMRecovery sometimes fail.
   TEZ-1931. Publish tez version info to Timeline.

http://git-wip-us.apache.org/repos/asf/tez/blob/b7268699/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index f26e4ae..577c98b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1560,6 +1560,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       Preconditions.checkState(vertexToBeReconfiguredByManager, "doneReconfiguringVertex() can be "
           + "invoked only after vertexReconfigurationPlanned() is invoked");
       this.vertexToBeReconfiguredByManager = false;
+      // TEZ-2015 VM may not have configured everything eg. input edge. maybeSendConfiguredEvent()
       if (completelyConfiguredSent.compareAndSet(false, true)) {
         // vertex already started and at that time this event was not sent. Send now.
         stateChangeNotifier.stateChanged(vertexId, new VertexStateUpdate(vertexName,

http://git-wip-us.apache.org/repos/asf/tez/blob/b7268699/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
index 11185ee..f5c187e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.dag.library.vertexmanager;
 
+import java.util.Collection;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 
@@ -32,11 +34,16 @@ import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 
 @Private
 public class InputReadyVertexManager extends VertexManagerPlugin {
@@ -48,6 +55,8 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
   int oneToOneSrcTasksDoneCount[];
   TaskLocationHint oneToOneLocationHints[];
   int numOneToOneEdges;
+  int numSignalsToWaitFor;
+  Multimap<String, Integer> pendingCompletions = LinkedListMultimap.create();
 
   public InputReadyVertexManager(VertexManagerPluginContext context) {
     super(context);
@@ -67,12 +76,10 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
     }
   }
   
-  @Override
-  public void initialize() {
-  }
-
-  @Override
-  public void onVertexStarted(Map<String, List<Integer>> completions) {
+  void start() {
+    if (!ready()) {
+      return;
+    }
     int numManagedTasks = getContext().getVertexNumTasks(getContext().getVertexName());
     LOG.info("Managing " + numManagedTasks + " tasks for vertex: " + getContext().getVertexName());
     taskIsStarted = new boolean[numManagedTasks];
@@ -117,24 +124,61 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
       oneToOneLocationHints = new TaskLocationHint[oneToOneSrcTaskCount];
     }
 
-    for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
+    for (Map.Entry<String, Collection<Integer>> entry :  pendingCompletions.asMap().entrySet()) {
       for (Integer task : entry.getValue()) {
         handleSourceTaskFinished(entry.getKey(), task);
       }
     }
   }
+  
+  boolean ready() {
+    int target = getContext().getInputVertexEdgeProperties().size() + 1;
+    Preconditions.checkState(numSignalsToWaitFor <= target);
+    return (numSignalsToWaitFor == target);
+  }
+  
+  @Override
+  public void initialize() {
+    Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
+    // wait for sources and self to start
+    numSignalsToWaitFor = 0;
+    for (String entry : edges.keySet()) {
+      getContext().registerForVertexStateUpdates(entry, EnumSet.of(VertexState.CONFIGURED));
+    }
+  }
+  
+  @Override
+  public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception {
+    numSignalsToWaitFor++;
+    LOG.info("Received configured signal from: " + stateUpdate.getVertexName() + 
+        " numConfiguredSources: " + numSignalsToWaitFor);
+    start();
+  }
 
   @Override
-  public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
-    handleSourceTaskFinished(srcVertexName, taskId);
+  public synchronized void onVertexStarted(Map<String, List<Integer>> completions) {
+    for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
+      pendingCompletions.putAll(entry.getKey(), entry.getValue());
+    }
+    numSignalsToWaitFor++;
+    start();
+  }
+
+  @Override
+  public synchronized void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+    if (ready()) {
+      handleSourceTaskFinished(srcVertexName, taskId);
+    } else {
+      pendingCompletions.put(srcVertexName, taskId);
+    }
   }
 
   @Override
-  public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+  public synchronized void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
   }
 
   @Override
-  public void onRootVertexInitialized(String inputName,
+  public synchronized void onRootVertexInitialized(String inputName,
       InputDescriptor inputDescriptor, List<Event> events) {
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/b7268699/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
index c6981ed..9a83a51 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
@@ -34,6 +34,8 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -55,7 +57,7 @@ public class TestInputReadyVertexManager {
   }
 
   @Test (timeout=5000)
-  public void testBasicScatterGather() {
+  public void testBasicScatterGather() throws Exception {
     HashMap<String, EdgeProperty> mockInputVertices = 
         new HashMap<String, EdgeProperty>();
     String mockSrcVertexId1 = "Vertex1";
@@ -80,6 +82,9 @@ public class TestInputReadyVertexManager {
     
     InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
+    // first source vertex configured
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    // then own vertex started
     manager.onVertexStarted(initialCompletions);
     manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
     verify(mockContext, times(0)).scheduleVertexTasks(anyList());
@@ -89,7 +94,7 @@ public class TestInputReadyVertexManager {
   }
   
   @Test (timeout=5000)
-  public void testBasicOneToOne() {
+  public void testBasicOneToOne() throws Exception {
     HashMap<String, EdgeProperty> mockInputVertices = 
         new HashMap<String, EdgeProperty>();
     String mockSrcVertexId1 = "Vertex1";
@@ -114,7 +119,10 @@ public class TestInputReadyVertexManager {
     
     InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
+    // first own vertex started
     manager.onVertexStarted(initialCompletions);
+    // then source vertex configured
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
     Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex().intValue());
@@ -139,9 +147,56 @@ public class TestInputReadyVertexManager {
     Assert.assertEquals(2, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getTaskIndex());
   }
+  
+  @Test (timeout=5000)
+  public void testDelayedConfigureOneToOne() throws Exception {
+    HashMap<String, EdgeProperty> mockInputVertices = 
+        new HashMap<String, EdgeProperty>();
+    String mockSrcVertexId1 = "Vertex1";
+    EdgeProperty eProp1 = EdgeProperty.create(
+        EdgeProperty.DataMovementType.ONE_TO_ONE,
+        EdgeProperty.DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
+    
+    String mockManagedVertexId = "Vertex";
+    
+    VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+    when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
+    when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
+    mockInputVertices.put(mockSrcVertexId1, eProp1);
+    
+    Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
+    initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
+    
+    InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
+    manager.initialize();
+    // first own vertex started
+    manager.onVertexStarted(initialCompletions);
+    verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
+    manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
+    verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
+    // then source vertex configured. now we start
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
+    Assert.assertEquals(2, requestCaptor.getAllValues().size());
+    Assert.assertEquals(1, requestCaptor.getValue().size());
+    Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+    manager.onSourceTaskCompleted(mockSrcVertexId1, 2);
+    verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
+    Assert.assertEquals(1, requestCaptor.getValue().size());
+    Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+    Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0)
+        .getTaskLocationHint().getAffinitizedTask().getVertexName());
+    Assert.assertEquals(2, requestCaptor.getValue().get(0)
+        .getTaskLocationHint().getAffinitizedTask().getTaskIndex());
+  }
 
   @Test (timeout=5000)
-  public void testComplex() {
+  public void testComplex() throws Exception {
     HashMap<String, EdgeProperty> mockInputVertices = 
         new HashMap<String, EdgeProperty>();
     String mockSrcVertexId1 = "Vertex1";
@@ -192,22 +247,43 @@ public class TestInputReadyVertexManager {
     
     Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
     
-    // 1-1 sources do not match managed tasks
+    // 1-1 sources do not match managed tasks before vertex started
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
     InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     try {
       manager.onVertexStarted(initialCompletions);
       Assert.assertTrue("Should have exception", false);
     } catch (TezUncheckedException e) {
       e.getMessage().contains("Managed task number must equal 1-1 source");
     }
+
+    // 1-1 sources do not match managed tasks after vertex started
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
+    manager = new InputReadyVertexManager(mockContext);
+    manager.initialize();
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+    manager.onVertexStarted(initialCompletions);
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
+    try {
+      manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+      Assert.assertTrue("Should have exception", false);
+    } catch (TezUncheckedException e) {
+      e.getMessage().contains("Managed task number must equal 1-1 source");
+    }
     
     // 1-1 sources do not match
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
     when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(4);
     manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     try {
       manager.onVertexStarted(initialCompletions);
       Assert.assertTrue("Should have exception", false);
@@ -220,6 +296,9 @@ public class TestInputReadyVertexManager {
     when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3);
     manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     manager.onVertexStarted(initialCompletions);
     // all 1-1 0's done but not scheduled because v1 is not done
     manager.onSourceTaskCompleted(mockSrcVertexId3, 0);