You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/09/11 06:59:18 UTC
git commit: TEZ-1559. Add system tests for AM recovery. (Jeff Zhang
via hitesh)
Repository: tez
Updated Branches:
refs/heads/master dfe38490f -> 43e47bfac
TEZ-1559. Add system tests for AM recovery. (Jeff Zhang via hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/43e47bfa
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/43e47bfa
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/43e47bfa
Branch: refs/heads/master
Commit: 43e47bfac239dada7564b6f8e78462143be06ec7
Parents: dfe3849
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Sep 10 21:58:20 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Sep 10 21:58:20 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 9 +-
.../org/apache/tez/dag/app/RecoveryParser.java | 15 +-
.../dag/history/recovery/RecoveryService.java | 39 ++
.../org/apache/tez/test/TestAMRecovery.java | 635 +++++++++++++++++++
4 files changed, 689 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/43e47bfa/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2cb7bef..0c0bb39 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
TEZ-1544. Link to release artifacts for 0.5.0 does not point to a specific link for 0.5.0.
+ TEZ-1559. Add system tests for AM recovery.
Release 0.5.1: Unreleased
@@ -18,7 +19,7 @@ ALL CHANGES
TEZ-1536. Fix spelling typo "configurartion" in TezClientUtils.
TEZ-1310. Update website documentation framework
TEZ-1447. Provide a mechanism for InputInitializers to know about Vertex state changes.
- TEZ-1362. Remove DAG_COMPLETED in DAGEventType.
+ TEZ-1362. Remove DAG_COMPLETED in DAGEventType.
TEZ-1519. TezTaskRunner should not initialize TezConfiguration in TezChild.
Release 0.5.0: Unreleased
@@ -27,7 +28,7 @@ INCOMPATIBLE CHANGES
TEZ-1038. Move TaskLocationHint outside of VertexLocationHint.
TEZ-960. VertexManagerPluginContext::getTotalAVailableResource() changed to
VertexManagerPluginContext::getTotalAvailableResource()
- TEZ-1025. Rename tez.am.max.task.attempts to tez.am.task.max.failed.attempts
+ TEZ-1025. Rename tez.am.max.task.attempts to tez.am.task.max.failed.attempts
TEZ-1018. VertexManagerPluginContext should enable assigning locality to
scheduled tasks
TEZ-1169. Allow numPhysicalInputs to be specified for RootInputs.
@@ -40,8 +41,8 @@ INCOMPATIBLE CHANGES
- Details at https://issues.apache.org/jira/browse/TEZ-1213?focusedCommentId
=14039381&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpa
nel#comment-14039381
- TEZ-1080, TEZ-1272, TEZ-1279, TEZ-1266. Change YARNRunner to use EdgeConfigs.
- - Removes separation of runtime configs into input/ouput configs. Also
+ TEZ-1080, TEZ-1272, TEZ-1279, TEZ-1266. Change YARNRunner to use EdgeConfigs.
+ - Removes separation of runtime configs into input/ouput configs. Also
refactors public methods used for this conversion.
TEZ-696. Remove implicit copying of processor payload to input and output
TEZ-1269. TaskScheduler prematurely releases containers
http://git-wip-us.apache.org/repos/asf/tez/blob/43e47bfa/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 715d4e0..9ba5847 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -235,18 +235,18 @@ public class RecoveryParser {
- private static void parseDAGRecoveryFile(FSDataInputStream inputStream)
+ public static List<HistoryEvent> parseDAGRecoveryFile(FSDataInputStream inputStream)
throws IOException {
+ List<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>();
while (true) {
HistoryEvent historyEvent = getNextEvent(inputStream);
if (historyEvent == null) {
LOG.info("Reached end of stream");
break;
}
- LOG.info("Parsed event from recovery stream"
- + ", eventType=" + historyEvent.getEventType()
- + ", event=" + historyEvent);
+ historyEvents.add(historyEvent);
}
+ return historyEvents;
}
public static void main(String argv[]) throws IOException {
@@ -264,7 +264,12 @@ public class RecoveryParser {
parseSummaryFile(fs.open(new Path(summaryPath)));
for (String dagPath : dagPaths) {
LOG.info("Parsing DAG recovery file " + dagPath);
- parseDAGRecoveryFile(fs.open(new Path(dagPath)));
+ List<HistoryEvent> historyEvents = parseDAGRecoveryFile(fs.open(new Path(dagPath)));
+ for (HistoryEvent historyEvent : historyEvents) {
+ LOG.info("Parsed event from recovery stream"
+ + ", eventType=" + historyEvent.getEventType()
+ + ", event=" + historyEvent);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/43e47bfa/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 737fde8..0034feb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -45,6 +45,8 @@ import org.apache.tez.dag.history.SummaryEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.records.TezDAGID;
+import com.google.common.annotations.VisibleForTesting;
+
public class RecoveryService extends AbstractService {
private static final Log LOG = LogFactory.getLog(RecoveryService.class);
@@ -53,6 +55,21 @@ public class RecoveryService extends AbstractService {
public static final String RECOVERY_FATAL_OCCURRED_DIR =
"RecoveryFatalErrorOccurred";
+ /**
+ * whether to handle remaining event in the eventqueue when AM is stopped
+ */
+ @VisibleForTesting
+ public static final String TEZ_AM_RECOVERY_HANDLE_REMAINING_EVENT_WHEN_STOPPED =
+ TezConfiguration.TEZ_AM_PREFIX + "recovery.handle_remaining_event_when_stopped";
+
+ /**
+ * by default do not handle remaining event when AM is stopped.
+ * Most of time, true is for recovery unit test
+ */
+ @VisibleForTesting
+ public static final boolean TEZ_AM_RECOVERY_HANDLE_REMAINING_EVENT_WHEN_STOPPED_DEFAULT = false;
+
+
private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
new LinkedBlockingQueue<DAGHistoryEvent>();
private Set<TezDAGID> completedDAGs = new HashSet<TezDAGID>();
@@ -75,6 +92,7 @@ public class RecoveryService extends AbstractService {
private int maxUnflushedEvents;
private int flushInterval;
private AtomicBoolean recoveryFatalErrorOccurred = new AtomicBoolean(false);
+ private boolean handleRemainingEventWhenStopped;
public RecoveryService(AppContext appContext) {
super(RecoveryService.class.getName());
@@ -93,6 +111,10 @@ public class RecoveryService extends AbstractService {
TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS_DEFAULT);
maxUnflushedEvents = conf.getInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS,
TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS_DEFAULT);
+
+ handleRemainingEventWhenStopped = conf.getBoolean(
+ TEZ_AM_RECOVERY_HANDLE_REMAINING_EVENT_WHEN_STOPPED,
+ TEZ_AM_RECOVERY_HANDLE_REMAINING_EVENT_WHEN_STOPPED_DEFAULT);
}
@Override
@@ -150,11 +172,28 @@ public class RecoveryService extends AbstractService {
@Override
public void serviceStop() {
LOG.info("Stopping RecoveryService");
+
stopped.set(true);
if (eventHandlingThread != null) {
eventHandlingThread.interrupt();
}
+ if (handleRemainingEventWhenStopped) {
+ LOG.info("Handle the remaining events in queue, queue size=" + eventQueue.size());
+ while(!eventQueue.isEmpty()) {
+ synchronized (lock) {
+ try {
+ DAGHistoryEvent event = eventQueue.take();
+ handleRecoveryEvent(event);
+ } catch (Exception e) {
+ // For now, ignore any such errors as these are non-critical
+ // All summary event related errors are handled as critical
+ LOG.warn("Error handling recovery event", e);
+ }
+ }
+ }
+ }
+
if (summaryStream != null) {
try {
LOG.info("Closing Summary Stream");
http://git-wip-us.apache.org/repos/asf/tez/blob/43e47bfa/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
new file mode 100644
index 0000000..eae38f0
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
@@ -0,0 +1,635 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.app.RecoveryParser;
+import org.apache.tez.dag.app.dag.impl.ImmediateStartVertexManager;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.recovery.RecoveryService;
+import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestAMRecovery {
+
+ private static final Log LOG = LogFactory.getLog(TestAMRecovery.class);
+
+ private static Configuration conf = new Configuration();
+ private static TezConfiguration tezConf;
+ private static int MAX_AM_ATTEMPT = 50;
+ private static MiniTezCluster miniTezCluster = null;
+ private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+ + TestAMRecovery.class.getName() + "-tmpDir";
+ private static MiniDFSCluster dfsCluster = null;
+ private static TezClient tezSession = null;
+ private static FileSystem remoteFs = null;
+ private static String FAIL_ON_PARTIAL_FINISHED = "FAIL_ON_PARTIAL_COMPLETED";
+ private static String FAIL_ON_ATTEMPT = "FAIL_ON_ATTEMPT";
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ LOG.info("Starting mini clusters");
+ try {
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+ dfsCluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true)
+ .racks(null).build();
+ remoteFs = dfsCluster.getFileSystem();
+ } catch (IOException io) {
+ throw new RuntimeException("problem starting mini dfs cluster", io);
+ }
+ if (miniTezCluster == null) {
+ miniTezCluster =
+ new MiniTezCluster(TestAMRecovery.class.getName(), 1, 1, 1);
+ Configuration miniTezconf = new Configuration(conf);
+ miniTezconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, MAX_AM_ATTEMPT);
+ miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+ miniTezCluster.init(miniTezconf);
+ miniTezCluster.start();
+ }
+ }
+
+ @AfterClass
+ public static void afterClass() throws InterruptedException {
+ if (tezSession != null) {
+ try {
+ LOG.info("Stopping Tez Session");
+ tezSession.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ Thread.sleep(10000);
+ if (miniTezCluster != null) {
+ try {
+ LOG.info("Stopping MiniTezCluster");
+ miniTezCluster.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ if (dfsCluster != null) {
+ try {
+ LOG.info("Stopping DFSCluster");
+ dfsCluster.shutdown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Before
+ public void setup() throws Exception {
+ LOG.info("Starting session");
+ Path remoteStagingDir =
+ remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
+ .valueOf(new Random().nextInt(100000))));
+ TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
+
+ tezConf = new TezConfiguration(miniTezCluster.getConfig());
+ tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 0);
+ tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "INFO");
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+ remoteStagingDir.toString());
+ tezConf
+ .setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
+ tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, MAX_AM_ATTEMPT);
+ tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
+ tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx256m");
+ tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
+ tezConf.setBoolean(
+ TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, false);
+ tezConf.setBoolean(
+ RecoveryService.TEZ_AM_RECOVERY_HANDLE_REMAINING_EVENT_WHEN_STOPPED,
+ true);
+ tezSession = TezClient.create("TestDAGRecovery", tezConf);
+ tezSession.start();
+ }
+
+ @After
+ public void teardown() throws InterruptedException {
+ if (tezSession != null) {
+ try {
+ LOG.info("Stopping Tez Session");
+ tezSession.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ tezSession = null;
+ Thread.sleep(10000);
+ }
+
+ /**
+ * Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
+ * is running. History flush happens. AM dies. Once AM is recovered, task 0 is
+ * not re-run. Task 1 is re-run. (Broadcast)
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 120000)
+ public void testVertexPartiallyFinished_Broadcast() throws Exception {
+ DAG dag =
+ createDAG(ControlledInputReadyVertexManager.class,
+ DataMovementType.BROADCAST, true);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+ List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+
+ // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
+ // attempt 1
+ assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
+ assertEquals(0, findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
+
+ // task_0 of v1 is finished in attempt 1 and not rerun, task_1 of v1 is
+ // finished in attempt 2
+ assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
+ assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
+ }
+
+ /**
+ * Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
+ * is also done. History flush happens. AM dies. Once AM is recovered, task 0
+ * and Task 1 is not re-run. (Broadcast)
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 120000)
+ public void testVertexCompletelyFinished_Broadcast() throws Exception {
+ DAG dag =
+ createDAG(ControlledInputReadyVertexManager.class,
+ DataMovementType.BROADCAST, false);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+
+ List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+ List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+
+ // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
+ // attempt 1
+ assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
+ assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
+
+ // task_0 of v1 is finished in attempt 1 and not rerun, task_1 of v1 is
+ // finished in attempt 2
+ assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
+ assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
+ }
+
+ /**
+ * Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
+ * is running. History flush happens. AM dies. Once AM is recovered, task 0 is
+ * not re-run. Task 1 is re-run. (ONE_TO_ONE)
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 120000)
+ public void testVertexPartialComplete_One2One() throws Exception {
+ DAG dag =
+ createDAG(ControlledInputReadyVertexManager.class,
+ DataMovementType.ONE_TO_ONE, true);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+
+ List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+ List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+
+ // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
+ // attempt 1
+ assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
+ assertEquals(0, findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
+
+ // task_0 of v1 is finished in attempt 1 and not rerun, task_1 of v1 is
+ // finished in attempt 2
+ assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
+ assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
+
+ }
+
+ /**
+ * Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
+ * is also done. History flush happens. AM dies. Once AM is recovered, task 0
+ * and Task 1 is not re-run. (ONE_TO_ONE)
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 120000)
+ public void testVertexCompletelyComplete_One2One() throws Exception {
+ DAG dag =
+ createDAG(ControlledInputReadyVertexManager.class,
+ DataMovementType.ONE_TO_ONE, false);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+
+ List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+ List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+
+ // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
+ // attempt 1
+ assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
+ assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
+
+ // task_0 of v1 is finished in attempt 1 and not rerun, task_1 of v1 is
+ // finished in attempt 2
+ assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
+ assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
+
+ }
+
+ /**
+ * Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
+ * is running. History flush happens. AM dies. Once AM is recovered, task 0 is
+ * not re-run. Task 1 is re-run. (SCATTER_GATHER)
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 120000)
+ public void testVertexPartiallyFinished_ScatterGather() throws Exception {
+ DAG dag =
+ createDAG(ControlledShuffleVertexManager.class,
+ DataMovementType.SCATTER_GATHER, true);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+
+ List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+ List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+
+ // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
+ // attempt 1
+ assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
+ assertEquals(0, findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
+
+ // task_0 of v1 is finished in attempt 1 and not rerun, task_1 of v1 is
+ // finished in attempt 2
+ assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
+ assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
+
+ }
+
+ /**
+ * Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
+ * is also done. History flush happens. AM dies. Once AM is recovered, task 0
+ * and Task 1 is not re-run. (SCATTER_GATHER)
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 120000)
+ public void testVertexCompletelyFinished_ScatterGather() throws Exception {
+ DAG dag =
+ createDAG(ControlledShuffleVertexManager.class,
+ DataMovementType.SCATTER_GATHER, false);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+
+ List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+ List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+
+ // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
+ // attempt 1
+ assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
+ assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
+
+ // task_0 of v1 is finished in attempt 1 and not rerun, task_1 of v1 is
+ // finished in attempt 2
+ assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
+ assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
+ }
+
+ /**
+ * Set AM max attempt to high number. Kill many attempts. Last AM can still be
+ * recovered with latest AM history data.
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 600000)
+ public void testHighMaxAttempt() throws Exception {
+ Random rand = new Random();
+ tezConf.set(FAIL_ON_ATTEMPT, rand.nextInt(MAX_AM_ATTEMPT) + "");
+ LOG.info("Set FAIL_ON_ATTEMPT=" + tezConf.get(FAIL_ON_ATTEMPT));
+ DAG dag =
+ createDAG(FailOnAttemptVertexManager.class,
+ DataMovementType.SCATTER_GATHER, false);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+
+ }
+
+ void runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception {
+ tezSession.waitTillReady();
+ DAGClient dagClient = tezSession.submitDAG(dag);
+ DAGStatus dagStatus =
+ dagClient.waitForCompletionWithStatusUpdates(EnumSet
+ .of(StatusGetOpts.GET_COUNTERS));
+ Assert.assertEquals(finalState, dagStatus.getState());
+ }
+
+ /**
+ * v1 --> v2 <br>
+ * v2 has a customized VM which could control when to kill AM
+ *
+ * @param vertexManagerClass
+ * @param dmType
+ * @param failOnParitialCompleted
+ * @return
+ * @throws IOException
+ */
+ private DAG createDAG(Class vertexManagerClass, DataMovementType dmType,
+ boolean failOnParitialCompleted) throws IOException {
+ if (failOnParitialCompleted) {
+ tezConf.set(FAIL_ON_PARTIAL_FINISHED, "true");
+ } else {
+ tezConf.set(FAIL_ON_PARTIAL_FINISHED, "false");
+ }
+ DAG dag = DAG.create("dag");
+ UserPayload payload = UserPayload.create(null);
+ Vertex v1 = Vertex.create("v1", MyProcessor.getProcDesc(), 2);
+ Vertex v2 = Vertex.create("v2", DoNothingProcessor.getProcDesc(), 2);
+ v2.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(
+ vertexManagerClass.getName()).setUserPayload(
+ TezUtils.createUserPayloadFromConf(tezConf)));
+
+ dag.addVertex(v1).addVertex(v2);
+ dag.addEdge(Edge.create(v1, v2, EdgeProperty.create(dmType,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ TestOutput.getOutputDesc(payload), TestInput.getInputDesc(payload))));
+ return dag;
+ }
+
+ private List<TaskAttemptFinishedEvent> findTaskAttemptFinishedEvent(
+ List<HistoryEvent> historyEvents, int vertexId, int taskId) {
+ List<TaskAttemptFinishedEvent> resultEvents =
+ new ArrayList<TaskAttemptFinishedEvent>();
+ for (HistoryEvent historyEvent : historyEvents) {
+ if (historyEvent.getEventType() == HistoryEventType.TASK_ATTEMPT_FINISHED) {
+ TaskAttemptFinishedEvent taFinishedEvent =
+ (TaskAttemptFinishedEvent) historyEvent;
+ if (taFinishedEvent.getTaskAttemptID().getTaskID().getVertexID()
+ .getId() == vertexId
+ && taFinishedEvent.getTaskAttemptID().getTaskID().getId() == taskId) {
+ resultEvents.add(taFinishedEvent);
+ }
+ }
+ }
+ return resultEvents;
+ }
+
+ private List<HistoryEvent> readRecoveryLog(int attemptNum) throws IOException {
+ ApplicationId appId = tezSession.getAppMasterApplicationId();
+ Path tezSystemStagingDir =
+ TezCommonUtils.getTezSystemStagingPath(tezConf, appId.toString());
+ Path recoveryDataDir =
+ TezCommonUtils.getRecoveryPath(tezSystemStagingDir, tezConf);
+ FileSystem fs = tezSystemStagingDir.getFileSystem(tezConf);
+ Path currentAttemptRecoveryDataDir =
+ TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, attemptNum);
+ Path recoveryFilePath =
+ new Path(currentAttemptRecoveryDataDir, appId.toString().replace(
+ "application", "dag")
+ + "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
+ return RecoveryParser.parseDAGRecoveryFile(fs.open(recoveryFilePath));
+ }
+
+ public static class ControlledInputReadyVertexManager extends
+ InputReadyVertexManager {
+
+ private Configuration conf;
+
+ public ControlledInputReadyVertexManager(VertexManagerPluginContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize() {
+ super.initialize();
+ try {
+ conf =
+ TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+ super.onSourceTaskCompleted(srcVertexName, taskId);
+ if (getContext().getDAGAttemptNumber() == 1) {
+ if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
+ if (taskId == 0) {
+ System.exit(-1);
+ }
+ } else {
+ if (taskId == 1) {
+ System.exit(-1);
+ }
+ }
+ }
+ }
+ }
+
+ public static class ControlledShuffleVertexManager extends
+ ShuffleVertexManager {
+
+ private Configuration conf;
+
+ public ControlledShuffleVertexManager(VertexManagerPluginContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize() {
+ super.initialize();
+ try {
+ conf =
+ TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+ super.onSourceTaskCompleted(srcVertexName, taskId);
+ if (getContext().getDAGAttemptNumber() == 1) {
+ if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
+ if (taskId == 0) {
+ System.exit(-1);
+ }
+ } else {
+ if (taskId == 1) {
+ System.exit(-1);
+ }
+ }
+ }
+ }
+ }
+
+ public static class ControlledImmediateStartVertexManager extends
+ ImmediateStartVertexManager {
+
+ private Configuration conf;
+
+ public ControlledImmediateStartVertexManager(
+ VertexManagerPluginContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize() {
+ super.initialize();
+ try {
+ conf =
+ TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+ super.onSourceTaskCompleted(srcVertexName, taskId);
+ if (getContext().getDAGAttemptNumber() == 1) {
+ if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
+ if (taskId == 0) {
+ System.exit(-1);
+ }
+ } else {
+ if (taskId == 1) {
+ System.exit(-1);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * VM which could control fail on attempt less than a specified number
+ *
+ */
+ public static class FailOnAttemptVertexManager extends ShuffleVertexManager {
+
+ private Configuration conf;
+
+ public FailOnAttemptVertexManager(VertexManagerPluginContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize() {
+ super.initialize();
+ try {
+ conf =
+ TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+ super.onSourceTaskCompleted(srcVertexName, taskId);
+ int curAttempt = getContext().getDAGAttemptNumber();
+ int failOnAttempt = conf.getInt(FAIL_ON_ATTEMPT, 1);
+ LOG.info("failOnAttempt:" + failOnAttempt);
+ LOG.info("curAttempt:" + curAttempt);
+ if (curAttempt < failOnAttempt) {
+ System.exit(-1);
+ }
+ }
+ }
+
+ /**
+ * Do nothing if it is in task 0, sleep 3 seconds for other tasks. This enable
+ * us to kill AM in VM when some tasks are still running.
+ *
+ */
+ public static class MyProcessor extends SimpleProcessor {
+
+ public MyProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void run() throws Exception {
+ if (getContext().getTaskIndex() == 0) {
+ return;
+ } else {
+ Thread.sleep(3 * 1000);
+ }
+ }
+
+ public static ProcessorDescriptor getProcDesc() {
+ return ProcessorDescriptor.create(MyProcessor.class.getName());
+ }
+ }
+
+ public static class DoNothingProcessor extends SimpleProcessor {
+
+ public DoNothingProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void run() throws Exception {
+ }
+
+ public static ProcessorDescriptor getProcDesc() {
+ return ProcessorDescriptor.create(DoNothingProcessor.class.getName());
+ }
+ }
+
+}