You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/09/11 06:59:18 UTC

git commit: TEZ-1559. Add system tests for AM recovery. (Jeff Zhang via hitesh)

Repository: tez
Updated Branches:
  refs/heads/master dfe38490f -> 43e47bfac


TEZ-1559. Add system tests for AM recovery. (Jeff Zhang via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/43e47bfa
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/43e47bfa
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/43e47bfa

Branch: refs/heads/master
Commit: 43e47bfac239dada7564b6f8e78462143be06ec7
Parents: dfe3849
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Sep 10 21:58:20 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Sep 10 21:58:20 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   9 +-
 .../org/apache/tez/dag/app/RecoveryParser.java  |  15 +-
 .../dag/history/recovery/RecoveryService.java   |  39 ++
 .../org/apache/tez/test/TestAMRecovery.java     | 635 +++++++++++++++++++
 4 files changed, 689 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/43e47bfa/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2cb7bef..0c0bb39 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
   TEZ-1544. Link to release artifacts for 0.5.0 does not point to a specific link for 0.5.0.
+  TEZ-1559. Add system tests for AM recovery.
 
 Release 0.5.1: Unreleased
 
@@ -18,7 +19,7 @@ ALL CHANGES
   TEZ-1536. Fix spelling typo "configurartion" in TezClientUtils.
   TEZ-1310. Update website documentation framework
   TEZ-1447. Provide a mechanism for InputInitializers to know about Vertex state changes.
-  TEZ-1362. Remove DAG_COMPLETED in DAGEventType. 
+  TEZ-1362. Remove DAG_COMPLETED in DAGEventType.
   TEZ-1519. TezTaskRunner should not initialize TezConfiguration in TezChild.
 
 Release 0.5.0: Unreleased
@@ -27,7 +28,7 @@ INCOMPATIBLE CHANGES
   TEZ-1038. Move TaskLocationHint outside of VertexLocationHint.
   TEZ-960. VertexManagerPluginContext::getTotalAVailableResource() changed to
   VertexManagerPluginContext::getTotalAvailableResource()
-  TEZ-1025. Rename tez.am.max.task.attempts to tez.am.task.max.failed.attempts 
+  TEZ-1025. Rename tez.am.max.task.attempts to tez.am.task.max.failed.attempts
   TEZ-1018. VertexManagerPluginContext should enable assigning locality to
   scheduled tasks
   TEZ-1169. Allow numPhysicalInputs to be specified for RootInputs.
@@ -40,8 +41,8 @@ INCOMPATIBLE CHANGES
     - Details at https://issues.apache.org/jira/browse/TEZ-1213?focusedCommentId
     =14039381&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpa
     nel#comment-14039381
-  TEZ-1080, TEZ-1272, TEZ-1279, TEZ-1266. Change YARNRunner to use EdgeConfigs. 
-    - Removes separation of runtime configs into input/ouput configs. Also 
+  TEZ-1080, TEZ-1272, TEZ-1279, TEZ-1266. Change YARNRunner to use EdgeConfigs.
+    - Removes separation of runtime configs into input/ouput configs. Also
     refactors public methods used for this conversion.
   TEZ-696. Remove implicit copying of processor payload to input and output
   TEZ-1269. TaskScheduler prematurely releases containers

http://git-wip-us.apache.org/repos/asf/tez/blob/43e47bfa/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 715d4e0..9ba5847 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -235,18 +235,18 @@ public class RecoveryParser {
 
 
 
-  private static void parseDAGRecoveryFile(FSDataInputStream inputStream)
+  public static List<HistoryEvent> parseDAGRecoveryFile(FSDataInputStream inputStream)
       throws IOException {
+    List<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>();
     while (true) {
       HistoryEvent historyEvent = getNextEvent(inputStream);
       if (historyEvent == null) {
         LOG.info("Reached end of stream");
         break;
       }
-      LOG.info("Parsed event from recovery stream"
-          + ", eventType=" + historyEvent.getEventType()
-          + ", event=" + historyEvent);
+      historyEvents.add(historyEvent);
     }
+    return historyEvents;
   }
 
   public static void main(String argv[]) throws IOException {
@@ -264,7 +264,12 @@ public class RecoveryParser {
     parseSummaryFile(fs.open(new Path(summaryPath)));
     for (String dagPath : dagPaths) {
       LOG.info("Parsing DAG recovery file " + dagPath);
-      parseDAGRecoveryFile(fs.open(new Path(dagPath)));
+      List<HistoryEvent> historyEvents = parseDAGRecoveryFile(fs.open(new Path(dagPath)));
+      for (HistoryEvent historyEvent : historyEvents) {
+        LOG.info("Parsed event from recovery stream"
+            + ", eventType=" + historyEvent.getEventType()
+            + ", event=" + historyEvent);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/43e47bfa/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 737fde8..0034feb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -45,6 +45,8 @@ import org.apache.tez.dag.history.SummaryEvent;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.records.TezDAGID;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class RecoveryService extends AbstractService {
 
   private static final Log LOG = LogFactory.getLog(RecoveryService.class);
@@ -53,6 +55,21 @@ public class RecoveryService extends AbstractService {
   public static final String RECOVERY_FATAL_OCCURRED_DIR =
       "RecoveryFatalErrorOccurred";
 
+  /**
+   * whether to handle remaining event in the eventqueue when AM is stopped
+   */
+  @VisibleForTesting
+  public static final String TEZ_AM_RECOVERY_HANDLE_REMAINING_EVENT_WHEN_STOPPED =
+      TezConfiguration.TEZ_AM_PREFIX + "recovery.handle_remaining_event_when_stopped";
+
+  /**
+   * by default do not handle remaining event when AM is stopped.
+   * Most of time, true is for recovery unit test
+   */
+  @VisibleForTesting
+  public static final boolean TEZ_AM_RECOVERY_HANDLE_REMAINING_EVENT_WHEN_STOPPED_DEFAULT = false;
+
+
   private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
       new LinkedBlockingQueue<DAGHistoryEvent>();
   private Set<TezDAGID> completedDAGs = new HashSet<TezDAGID>();
@@ -75,6 +92,7 @@ public class RecoveryService extends AbstractService {
   private int maxUnflushedEvents;
   private int flushInterval;
   private AtomicBoolean recoveryFatalErrorOccurred = new AtomicBoolean(false);
+  private boolean handleRemainingEventWhenStopped;
 
   public RecoveryService(AppContext appContext) {
     super(RecoveryService.class.getName());
@@ -93,6 +111,10 @@ public class RecoveryService extends AbstractService {
         TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS_DEFAULT);
     maxUnflushedEvents = conf.getInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS,
         TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS_DEFAULT);
+
+    handleRemainingEventWhenStopped = conf.getBoolean(
+        TEZ_AM_RECOVERY_HANDLE_REMAINING_EVENT_WHEN_STOPPED,
+        TEZ_AM_RECOVERY_HANDLE_REMAINING_EVENT_WHEN_STOPPED_DEFAULT);
   }
 
   @Override
@@ -150,11 +172,28 @@ public class RecoveryService extends AbstractService {
   @Override
   public void serviceStop() {
     LOG.info("Stopping RecoveryService");
+
     stopped.set(true);
     if (eventHandlingThread != null) {
       eventHandlingThread.interrupt();
     }
 
+    if (handleRemainingEventWhenStopped) {
+      LOG.info("Handle the remaining events in queue, queue size=" + eventQueue.size());
+      while(!eventQueue.isEmpty()) {
+        synchronized (lock) {
+          try {
+            DAGHistoryEvent event = eventQueue.take();
+            handleRecoveryEvent(event);
+          } catch (Exception e) {
+            // For now, ignore any such errors as these are non-critical
+            // All summary event related errors are handled as critical
+            LOG.warn("Error handling recovery event", e);
+          }
+        }
+      }
+    }
+
     if (summaryStream != null) {
       try {
         LOG.info("Closing Summary Stream");

http://git-wip-us.apache.org/repos/asf/tez/blob/43e47bfa/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
new file mode 100644
index 0000000..eae38f0
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
@@ -0,0 +1,635 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.app.RecoveryParser;
+import org.apache.tez.dag.app.dag.impl.ImmediateStartVertexManager;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.recovery.RecoveryService;
+import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestAMRecovery {
+
+  private static final Log LOG = LogFactory.getLog(TestAMRecovery.class);
+
+  private static Configuration conf = new Configuration();
+  private static TezConfiguration tezConf;
+  private static int MAX_AM_ATTEMPT = 50;
+  private static MiniTezCluster miniTezCluster = null;
+  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+      + TestAMRecovery.class.getName() + "-tmpDir";
+  private static MiniDFSCluster dfsCluster = null;
+  private static TezClient tezSession = null;
+  private static FileSystem remoteFs = null;
+  private static String FAIL_ON_PARTIAL_FINISHED = "FAIL_ON_PARTIAL_COMPLETED";
+  private static String FAIL_ON_ATTEMPT = "FAIL_ON_ATTEMPT";
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    LOG.info("Starting mini clusters");
+    try {
+      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+      dfsCluster =
+          new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true)
+              .racks(null).build();
+      remoteFs = dfsCluster.getFileSystem();
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+    if (miniTezCluster == null) {
+      miniTezCluster =
+          new MiniTezCluster(TestAMRecovery.class.getName(), 1, 1, 1);
+      Configuration miniTezconf = new Configuration(conf);
+      miniTezconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, MAX_AM_ATTEMPT);
+      miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+      miniTezCluster.init(miniTezconf);
+      miniTezCluster.start();
+    }
+  }
+
+  @AfterClass
+  public static void afterClass() throws InterruptedException {
+    if (tezSession != null) {
+      try {
+        LOG.info("Stopping Tez Session");
+        tezSession.stop();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+    Thread.sleep(10000);
+    if (miniTezCluster != null) {
+      try {
+        LOG.info("Stopping MiniTezCluster");
+        miniTezCluster.stop();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+    if (dfsCluster != null) {
+      try {
+        LOG.info("Stopping DFSCluster");
+        dfsCluster.shutdown();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  @Before
+  public void setup() throws Exception {
+    LOG.info("Starting session");
+    Path remoteStagingDir =
+        remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
+            .valueOf(new Random().nextInt(100000))));
+    TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
+
+    tezConf = new TezConfiguration(miniTezCluster.getConfig());
+    tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 0);
+    tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "INFO");
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+        remoteStagingDir.toString());
+    tezConf
+        .setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
+    tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, MAX_AM_ATTEMPT);
+    tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
+    tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx256m");
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
+    tezConf.setBoolean(
+        TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, false);
+    tezConf.setBoolean(
+        RecoveryService.TEZ_AM_RECOVERY_HANDLE_REMAINING_EVENT_WHEN_STOPPED,
+        true);
+    tezSession = TezClient.create("TestDAGRecovery", tezConf);
+    tezSession.start();
+  }
+
+  @After
+  public void teardown() throws InterruptedException {
+    if (tezSession != null) {
+      try {
+        LOG.info("Stopping Tez Session");
+        tezSession.stop();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+    tezSession = null;
+    Thread.sleep(10000);
+  }
+
+  /**
+   * Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
+   * is running. History flush happens. AM dies. Once AM is recovered, task 0 is
+   * not re-run. Task 1 is re-run. (Broadcast)
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 120000)
+  public void testVertexPartiallyFinished_Broadcast() throws Exception {
+    DAG dag =
+        createDAG(ControlledInputReadyVertexManager.class,
+            DataMovementType.BROADCAST, true);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+    List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+    List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+
+    // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
+    // attempt 1
+    assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
+    assertEquals(0, findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
+
+    // task_0 of v1 is finished in attempt 1 and not rerun, task_1 of v1 is
+    // finished in attempt 2
+    assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
+    assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
+  }
+
+  /**
+   * Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
+   * is also done. History flush happens. AM dies. Once AM is recovered, task 0
+   * and Task 1 is not re-run. (Broadcast)
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 120000)
+  public void testVertexCompletelyFinished_Broadcast() throws Exception {
+    DAG dag =
+        createDAG(ControlledInputReadyVertexManager.class,
+            DataMovementType.BROADCAST, false);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+
+    List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+    List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+
+    // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
+    // attempt 1
+    assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
+    assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
+
+    // task_0 of v1 is finished in attempt 1 and not rerun, task_1 of v1 is
+    // finished in attempt 2
+    assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
+    assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
+  }
+
+  /**
+   * Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
+   * is running. History flush happens. AM dies. Once AM is recovered, task 0 is
+   * not re-run. Task 1 is re-run. (ONE_TO_ONE)
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 120000)
+  public void testVertexPartialComplete_One2One() throws Exception {
+    DAG dag =
+        createDAG(ControlledInputReadyVertexManager.class,
+            DataMovementType.ONE_TO_ONE, true);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+
+    List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+    List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+
+    // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
+    // attempt 1
+    assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
+    assertEquals(0, findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
+
+    // task_0 of v1 is finished in attempt 1 and not rerun, task_1 of v1 is
+    // finished in attempt 2
+    assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
+    assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
+
+  }
+
+  /**
+   * Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
+   * is also done. History flush happens. AM dies. Once AM is recovered, task 0
+   * and Task 1 is not re-run. (ONE_TO_ONE)
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 120000)
+  public void testVertexCompletelyComplete_One2One() throws Exception {
+    DAG dag =
+        createDAG(ControlledInputReadyVertexManager.class,
+            DataMovementType.ONE_TO_ONE, false);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+
+    List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+    List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+
+    // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
+    // attempt 1
+    assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
+    assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
+
+    // task_0 of v1 is finished in attempt 1 and not rerun, task_1 of v1 is
+    // finished in attempt 2
+    assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
+    assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
+
+  }
+
+  /**
+   * Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
+   * is running. History flush happens. AM dies. Once AM is recovered, task 0 is
+   * not re-run. Task 1 is re-run. (SCATTER_GATHER)
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 120000)
+  public void testVertexPartiallyFinished_ScatterGather() throws Exception {
+    DAG dag =
+        createDAG(ControlledShuffleVertexManager.class,
+            DataMovementType.SCATTER_GATHER, true);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+
+    List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+    List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+
+    // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
+    // attempt 1
+    assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
+    assertEquals(0, findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
+
+    // task_0 of v1 is finished in attempt 1 and not rerun, task_1 of v1 is
+    // finished in attempt 2
+    assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
+    assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
+
+  }
+
+  /**
+   * Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
+   * is also done. History flush happens. AM dies. Once AM is recovered, task 0
+   * and Task 1 is not re-run. (SCATTER_GATHER)
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 120000)
+  public void testVertexCompletelyFinished_ScatterGather() throws Exception {
+    DAG dag =
+        createDAG(ControlledShuffleVertexManager.class,
+            DataMovementType.SCATTER_GATHER, false);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+
+    List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+    List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+
+    // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
+    // attempt 1
+    assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
+    assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
+
+    // task_0 of v1 is finished in attempt 1 and not rerun, task_1 of v1 is
+    // finished in attempt 2
+    assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
+    assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
+  }
+
+  /**
+   * Set AM max attempt to high number. Kill many attempts. Last AM can still be
+   * recovered with latest AM history data.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 600000)
+  public void testHighMaxAttempt() throws Exception {
+    Random rand = new Random();
+    tezConf.set(FAIL_ON_ATTEMPT, rand.nextInt(MAX_AM_ATTEMPT) + "");
+    LOG.info("Set FAIL_ON_ATTEMPT=" + tezConf.get(FAIL_ON_ATTEMPT));
+    DAG dag =
+        createDAG(FailOnAttemptVertexManager.class,
+            DataMovementType.SCATTER_GATHER, false);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+
+  }
+
+  void runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception {
+    tezSession.waitTillReady();
+    DAGClient dagClient = tezSession.submitDAG(dag);
+    DAGStatus dagStatus =
+        dagClient.waitForCompletionWithStatusUpdates(EnumSet
+            .of(StatusGetOpts.GET_COUNTERS));
+    Assert.assertEquals(finalState, dagStatus.getState());
+  }
+
+  /**
+   * v1 --> v2 <br>
+   * v2 has a customized VM which could control when to kill AM
+   *
+   * @param vertexManagerClass
+   * @param dmType
+   * @param failOnParitialCompleted
+   * @return
+   * @throws IOException
+   */
+  private DAG createDAG(Class vertexManagerClass, DataMovementType dmType,
+      boolean failOnParitialCompleted) throws IOException {
+    if (failOnParitialCompleted) {
+      tezConf.set(FAIL_ON_PARTIAL_FINISHED, "true");
+    } else {
+      tezConf.set(FAIL_ON_PARTIAL_FINISHED, "false");
+    }
+    DAG dag = DAG.create("dag");
+    UserPayload payload = UserPayload.create(null);
+    Vertex v1 = Vertex.create("v1", MyProcessor.getProcDesc(), 2);
+    Vertex v2 = Vertex.create("v2", DoNothingProcessor.getProcDesc(), 2);
+    v2.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(
+        vertexManagerClass.getName()).setUserPayload(
+        TezUtils.createUserPayloadFromConf(tezConf)));
+
+    dag.addVertex(v1).addVertex(v2);
+    dag.addEdge(Edge.create(v1, v2, EdgeProperty.create(dmType,
+        DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+        TestOutput.getOutputDesc(payload), TestInput.getInputDesc(payload))));
+    return dag;
+  }
+
+  private List<TaskAttemptFinishedEvent> findTaskAttemptFinishedEvent(
+      List<HistoryEvent> historyEvents, int vertexId, int taskId) {
+    List<TaskAttemptFinishedEvent> resultEvents =
+        new ArrayList<TaskAttemptFinishedEvent>();
+    for (HistoryEvent historyEvent : historyEvents) {
+      if (historyEvent.getEventType() == HistoryEventType.TASK_ATTEMPT_FINISHED) {
+        TaskAttemptFinishedEvent taFinishedEvent =
+            (TaskAttemptFinishedEvent) historyEvent;
+        if (taFinishedEvent.getTaskAttemptID().getTaskID().getVertexID()
+            .getId() == vertexId
+            && taFinishedEvent.getTaskAttemptID().getTaskID().getId() == taskId) {
+          resultEvents.add(taFinishedEvent);
+        }
+      }
+    }
+    return resultEvents;
+  }
+
+  private List<HistoryEvent> readRecoveryLog(int attemptNum) throws IOException {
+    ApplicationId appId = tezSession.getAppMasterApplicationId();
+    Path tezSystemStagingDir =
+        TezCommonUtils.getTezSystemStagingPath(tezConf, appId.toString());
+    Path recoveryDataDir =
+        TezCommonUtils.getRecoveryPath(tezSystemStagingDir, tezConf);
+    FileSystem fs = tezSystemStagingDir.getFileSystem(tezConf);
+    Path currentAttemptRecoveryDataDir =
+        TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, attemptNum);
+    Path recoveryFilePath =
+        new Path(currentAttemptRecoveryDataDir, appId.toString().replace(
+            "application", "dag")
+            + "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
+    return RecoveryParser.parseDAGRecoveryFile(fs.open(recoveryFilePath));
+  }
+
+  public static class ControlledInputReadyVertexManager extends
+      InputReadyVertexManager {
+
+    private Configuration conf;
+
+    public ControlledInputReadyVertexManager(VertexManagerPluginContext context) {
+      super(context);
+    }
+
+    @Override
+    public void initialize() {
+      super.initialize();
+      try {
+        conf =
+            TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+      super.onSourceTaskCompleted(srcVertexName, taskId);
+      if (getContext().getDAGAttemptNumber() == 1) {
+        if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
+          if (taskId == 0) {
+            System.exit(-1);
+          }
+        } else {
+          if (taskId == 1) {
+            System.exit(-1);
+          }
+        }
+      }
+    }
+  }
+
+  public static class ControlledShuffleVertexManager extends
+      ShuffleVertexManager {
+
+    private Configuration conf;
+
+    public ControlledShuffleVertexManager(VertexManagerPluginContext context) {
+      super(context);
+    }
+
+    @Override
+    public void initialize() {
+      super.initialize();
+      try {
+        conf =
+            TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+      super.onSourceTaskCompleted(srcVertexName, taskId);
+      if (getContext().getDAGAttemptNumber() == 1) {
+        if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
+          if (taskId == 0) {
+            System.exit(-1);
+          }
+        } else {
+          if (taskId == 1) {
+            System.exit(-1);
+          }
+        }
+      }
+    }
+  }
+
+  public static class ControlledImmediateStartVertexManager extends
+      ImmediateStartVertexManager {
+
+    private Configuration conf;
+
+    public ControlledImmediateStartVertexManager(
+        VertexManagerPluginContext context) {
+      super(context);
+    }
+
+    @Override
+    public void initialize() {
+      super.initialize();
+      try {
+        conf =
+            TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+      super.onSourceTaskCompleted(srcVertexName, taskId);
+      if (getContext().getDAGAttemptNumber() == 1) {
+        if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
+          if (taskId == 0) {
+            System.exit(-1);
+          }
+        } else {
+          if (taskId == 1) {
+            System.exit(-1);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * VM which could control fail on attempt less than a specified number
+   *
+   */
+  public static class FailOnAttemptVertexManager extends ShuffleVertexManager {
+
+    private Configuration conf;
+
+    public FailOnAttemptVertexManager(VertexManagerPluginContext context) {
+      super(context);
+    }
+
+    @Override
+    public void initialize() {
+      super.initialize();
+      try {
+        conf =
+            TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+      super.onSourceTaskCompleted(srcVertexName, taskId);
+      int curAttempt = getContext().getDAGAttemptNumber();
+      int failOnAttempt = conf.getInt(FAIL_ON_ATTEMPT, 1);
+      LOG.info("failOnAttempt:" + failOnAttempt);
+      LOG.info("curAttempt:" + curAttempt);
+      if (curAttempt < failOnAttempt) {
+        System.exit(-1);
+      }
+    }
+  }
+
+  /**
+   * Do nothing if it is in task 0, sleep 3 seconds for other tasks. This enable
+   * us to kill AM in VM when some tasks are still running.
+   *
+   */
+  public static class MyProcessor extends SimpleProcessor {
+
+    public MyProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      if (getContext().getTaskIndex() == 0) {
+        return;
+      } else {
+        Thread.sleep(3 * 1000);
+      }
+    }
+
+    public static ProcessorDescriptor getProcDesc() {
+      return ProcessorDescriptor.create(MyProcessor.class.getName());
+    }
+  }
+
+  public static class DoNothingProcessor extends SimpleProcessor {
+
+    public DoNothingProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+    }
+
+    public static ProcessorDescriptor getProcDesc() {
+      return ProcessorDescriptor.create(DoNothingProcessor.class.getName());
+    }
+  }
+
+}