You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2014/12/19 03:44:30 UTC

tez git commit: TEZ-1642. TestAMRecovery sometimes fail (zjffdu)

Repository: tez
Updated Branches:
  refs/heads/master a46ecb23d -> 1dd725f79


TEZ-1642. TestAMRecovery sometimes fail (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1dd725f7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1dd725f7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1dd725f7

Branch: refs/heads/master
Commit: 1dd725f7976de91a93f7cf25fb922278c4993af8
Parents: a46ecb2
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri Dec 19 10:43:04 2014 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri Dec 19 10:43:04 2014 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/test/TestAMRecovery.java     | 181 +++++++++++--------
 2 files changed, 107 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/1dd725f7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7e1a386..05d6c39 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1642. TestAMRecovery sometimes fail.
   TEZ-1859. TestGroupedSplits has commented out test: testGzip.
   TEZ-1868. Document how to do Windows builds due to with ACL symlink build changes.
   TEZ-1872. docs/src/site/custom/project-info-report.properties needs license header.

http://git-wip-us.apache.org/repos/asf/tez/blob/1dd725f7/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
index 42d6f5c..8c3eff4 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
@@ -47,12 +47,15 @@ import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
@@ -65,7 +68,9 @@ import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
 import org.apache.tez.dag.history.recovery.RecoveryService;
 import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.processor.SimpleProcessor;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -74,6 +79,8 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+
 public class TestAMRecovery {
 
   private static final Log LOG = LogFactory.getLog(TestAMRecovery.class);
@@ -127,6 +134,7 @@ public class TestAMRecovery {
       try {
         LOG.info("Stopping MiniTezCluster");
         miniTezCluster.stop();
+        miniTezCluster = null;
       } catch (Exception e) {
         e.printStackTrace();
       }
@@ -184,7 +192,7 @@ public class TestAMRecovery {
 
   /**
    * Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
-   * is running. History flush happens. AM dies. Once AM is recovered, task 0 is
+   * is not started. History flush happens. AM dies. Once AM is recovered, task 0 is
    * not re-run. Task 1 is re-run. (Broadcast)
    *
    * @throws Exception
@@ -192,17 +200,16 @@ public class TestAMRecovery {
   @Test(timeout = 120000)
   public void testVertexPartiallyFinished_Broadcast() throws Exception {
     DAG dag =
-        createDAG(ControlledInputReadyVertexManager.class,
+        createDAG(ControlledImmediateStartVertexManager.class,
             DataMovementType.BROADCAST, true);
     TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
-    assertEquals(5, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
-    assertEquals(1, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
     assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
     assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
 
     List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
     List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
+    printHistoryEvents(historyEvents1, 1);
+    printHistoryEvents(historyEvents1, 2);
     // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
     // attempt 1
     assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
@@ -224,17 +231,16 @@ public class TestAMRecovery {
   @Test(timeout = 120000)
   public void testVertexCompletelyFinished_Broadcast() throws Exception {
     DAG dag =
-        createDAG(ControlledInputReadyVertexManager.class,
+        createDAG(ControlledImmediateStartVertexManager.class,
             DataMovementType.BROADCAST, false);
     TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
-    assertEquals(4, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
-    assertEquals(0, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
     assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
     assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
 
     List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
     List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
+    printHistoryEvents(historyEvents1, 1);
+    printHistoryEvents(historyEvents1, 2);
     // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
     // attempt 1
     assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
@@ -248,7 +254,7 @@ public class TestAMRecovery {
 
   /**
    * Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
-   * is running. History flush happens. AM dies. Once AM is recovered, task 0 is
+   * is not started. History flush happens. AM dies. Once AM is recovered, task 0 is
    * not re-run. Task 1 is re-run. (ONE_TO_ONE)
    *
    * @throws Exception
@@ -259,14 +265,13 @@ public class TestAMRecovery {
         createDAG(ControlledInputReadyVertexManager.class,
             DataMovementType.ONE_TO_ONE, true);
     TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
-    assertEquals(5, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
-    assertEquals(1, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
     assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
     assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
 
     List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
     List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
+    printHistoryEvents(historyEvents1, 1);
+    printHistoryEvents(historyEvents1, 2);
     // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
     // attempt 1
     assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
@@ -292,14 +297,13 @@ public class TestAMRecovery {
         createDAG(ControlledInputReadyVertexManager.class,
             DataMovementType.ONE_TO_ONE, false);
     TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
-    assertEquals(4, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
-    assertEquals(0, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
     assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
     assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
 
     List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
     List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
+    printHistoryEvents(historyEvents1, 1);
+    printHistoryEvents(historyEvents1, 2);
     // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
     // attempt 1
     assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
@@ -314,7 +318,7 @@ public class TestAMRecovery {
 
   /**
    * Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
-   * is running. History flush happens. AM dies. Once AM is recovered, task 0 is
+   * is not started. History flush happens. AM dies. Once AM is recovered, task 0 is
    * not re-run. Task 1 is re-run. (SCATTER_GATHER)
    *
    * @throws Exception
@@ -325,14 +329,13 @@ public class TestAMRecovery {
         createDAG(ControlledShuffleVertexManager.class,
             DataMovementType.SCATTER_GATHER, true);
     TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
-    assertEquals(5, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
-    assertEquals(1, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
     assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
     assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
 
     List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
     List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
+    printHistoryEvents(historyEvents1, 1);
+    printHistoryEvents(historyEvents1, 2);
     // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
     // attempt 1
     assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
@@ -358,14 +361,13 @@ public class TestAMRecovery {
         createDAG(ControlledShuffleVertexManager.class,
             DataMovementType.SCATTER_GATHER, false);
     TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
-    assertEquals(4, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
-    assertEquals(0, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
     assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
     assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
 
     List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
     List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
+    printHistoryEvents(historyEvents1, 1);
+    printHistoryEvents(historyEvents1, 2);
     // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
     // attempt 1
     assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
@@ -407,6 +409,7 @@ public class TestAMRecovery {
 
   /**
    * v1 --> v2 <br>
+   * v1 has a customized VM to control whether to schedule only one second task when it is partiallyFinished test case.
    * v2 has a customized VM which could control when to kill AM
    *
    * @param vertexManagerClass
@@ -425,6 +428,9 @@ public class TestAMRecovery {
     DAG dag = DAG.create("dag");
     UserPayload payload = UserPayload.create(null);
     Vertex v1 = Vertex.create("v1", MyProcessor.getProcDesc(), 2);
+    v1.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(
+        ScheduleControlledVertexManager.class.getName()).setUserPayload(
+        TezUtils.createUserPayloadFromConf(tezConf)));
     Vertex v2 = Vertex.create("v2", DoNothingProcessor.getProcDesc(), 2);
     v2.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(
         vertexManagerClass.getName()).setUserPayload(
@@ -471,6 +477,16 @@ public class TestAMRecovery {
     return RecoveryParser.parseDAGRecoveryFile(fs.open(recoveryFilePath));
   }
 
+  private void printHistoryEvents(List<HistoryEvent> historyEvents, int attemptId) {
+    LOG.info("RecoveryLogs from attempt:" + attemptId);
+    for(HistoryEvent historyEvent : historyEvents) {
+      LOG.info("Parsed event from recovery stream"
+          + ", eventType=" + historyEvent.getEventType()
+          + ", event=" + historyEvent);
+    }
+    LOG.info("");
+  }
+
   public static class ControlledInputReadyVertexManager extends
       InputReadyVertexManager {
 
@@ -496,29 +512,16 @@ public class TestAMRecovery {
       super.onSourceTaskCompleted(srcVertexName, taskId);
       if (getContext().getDAGAttemptNumber() == 1) {
         if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
-          if (taskId == 0) {
+          if (taskId.intValue() == 0) {
             System.exit(-1);
           }
         } else {
-          if (taskId == 1) {
+          if (taskId.intValue() == 1) {
             System.exit(-1);
           }
         }
       }
     }
-
-    @Override
-    public void onVertexStarted(Map<String, List<Integer>> completions) {
-      // sleep for 1 seconds to delay the running of task in v2.
-      // this could keep the case that task of v1 is partial finished or completely
-      // finished, and at the same time the task of v2 is not started
-      try {
-        Thread.sleep(1*1000);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-      super.onVertexStarted(completions);
-    }
   }
 
   public static class ControlledShuffleVertexManager extends
@@ -543,33 +546,19 @@ public class TestAMRecovery {
 
     @Override
     public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
-      int dagAttemptNumber = getContext().getDAGAttemptNumber();
       super.onSourceTaskCompleted(srcVertexName, taskId);
-      if (dagAttemptNumber == 1) {
+      if (getContext().getDAGAttemptNumber() == 1) {
         if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
-          if (taskId == 0) {
+          if (taskId.intValue() == 0) {
             System.exit(-1);
           }
         } else {
-          if (taskId == 1) {
+          if (taskId.intValue() == 1) {
             System.exit(-1);
           }
         }
       }
     }
-
-    @Override
-    public void onVertexStarted(Map<String, List<Integer>> completions) {
-      // sleep for 1 seconds to delay the running of task in v2.
-      // this could keep the case that task of v1 is partial finished or completely
-      // finished, and at the same time the task of v2 is not started
-      try {
-        Thread.sleep(1*1000);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-      super.onVertexStarted(completions);
-    }
   }
 
   public static class ControlledImmediateStartVertexManager extends
@@ -598,28 +587,76 @@ public class TestAMRecovery {
       super.onSourceTaskCompleted(srcVertexName, taskId);
       if (getContext().getDAGAttemptNumber() == 1) {
         if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
-          if (taskId == 0) {
+          if (taskId.intValue() == 0) {
             System.exit(-1);
           }
         } else {
-          if (taskId == 1) {
+          if (taskId.intValue() == 1) {
             System.exit(-1);
           }
         }
       }
     }
+  }
+
+  
+  /**
+   * VertexManager which control schedule only one task when it is test case of partially-finished.
+   *
+   */
+  public static class ScheduleControlledVertexManager extends VertexManagerPlugin {
+
+    private Configuration conf;
+
+    public ScheduleControlledVertexManager(VertexManagerPluginContext context) {
+      super(context);
+    }
 
     @Override
-    public void onVertexStarted(Map<String, List<Integer>> completions) {
-      // sleep for 1 seconds to delay the running of task in v2.
-      // this could keep the case that task of v1 is partial finished or completely
-      // finished, and at the same time the task of v2 is not started
+    public void initialize() {
       try {
-        Thread.sleep(1*1000);
-      } catch (InterruptedException e) {
+        conf =
+            TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+      } catch (IOException e) {
         e.printStackTrace();
       }
-      super.onVertexStarted(completions);
+    }
+
+    @Override
+    public void onVertexStarted(Map<String, List<Integer>> completions)
+        throws Exception {
+      if (getContext().getDAGAttemptNumber() == 1) {
+        // only schedule one task if it is partiallyFinished case
+        if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
+          getContext().scheduleVertexTasks(Lists.newArrayList(new TaskWithLocationHint(0, null)));
+          return ;
+        }
+      }
+      // schedule all tasks when it is not partiallyFinished
+      int taskNum = getContext().getVertexNumTasks(getContext().getVertexName());
+      List<TaskWithLocationHint> taskWithLocationHints = new ArrayList<TaskWithLocationHint>();
+      for (int i=0;i<taskNum;++i) {
+        taskWithLocationHints.add(new TaskWithLocationHint(i, null));
+      }
+      getContext().scheduleVertexTasks(taskWithLocationHints);
+    }
+
+    @Override
+    public void onSourceTaskCompleted(String srcVertexName, Integer taskId)
+        throws Exception {
+      
+    }
+
+    @Override
+    public void onVertexManagerEventReceived(VertexManagerEvent vmEvent)
+        throws Exception {
+      
+    }
+
+    @Override
+    public void onRootVertexInitialized(String inputName,
+        InputDescriptor inputDescriptor, List<Event> events) throws Exception {
+      
     }
   }
 
@@ -663,11 +700,6 @@ public class TestAMRecovery {
     Counter_1,
   }
 
-  /**
-   * Do nothing if it is in task 0, sleep 3 seconds for other tasks. This enable
-   * us to kill AM in VM when some tasks are still running.
-   *
-   */
   public static class MyProcessor extends SimpleProcessor {
 
     public MyProcessor(ProcessorContext context) {
@@ -677,12 +709,6 @@ public class TestAMRecovery {
     @Override
     public void run() throws Exception {
       getContext().getCounters().findCounter(TestCounter.Counter_1).increment(1);
-      if (getContext().getTaskIndex() == 0) {
-        // keep task_0 running for 1 seconds to wait for task_1 start running
-        Thread.sleep(1 * 1000);;
-      } else {
-        Thread.sleep(3 * 1000);
-      }
     }
 
     public static ProcessorDescriptor getProcDesc() {
@@ -698,6 +724,11 @@ public class TestAMRecovery {
 
     @Override
     public void run() throws Exception {
+      // Sleep 3 second in vertex2 to avoid that vertex2 completed 
+      // before vertex2 get the SourceVertexTaskAttemptCompletedEvent.
+      // SourceVertexTaskAttemptCompletedEvent will been ingored if vertex in SUCCEEDED,
+      // so AM won't been killed in the VM of vertex2
+      Thread.sleep(3000);
     }
 
     public static ProcessorDescriptor getProcDesc() {