You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by sr...@apache.org on 2016/02/24 19:51:42 UTC
[8/9] tez git commit: TEZ-3124. Running task hangs due to missing
event to initialize input in recovery (zjffdu)
TEZ-3124. Running task hangs due to missing event to initialize input in recovery (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/701e9aa2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/701e9aa2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/701e9aa2
Branch: refs/heads/TEZ-2980
Commit: 701e9aa2ce04585be657bc8e1c3eb17317afad6b
Parents: 7fc28f7
Author: Jeff Zhang <zj...@apache.org>
Authored: Wed Feb 24 16:03:30 2016 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Wed Feb 24 16:05:01 2016 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 13 +-
.../RecoveryServiceWithEventHandlingHook.java | 116 +++++++++++++++--
.../java/org/apache/tez/test/TestRecovery.java | 123 +++++++++++++++++++
4 files changed, 236 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/701e9aa2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5dc8976..fb0e8b6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
TEZ-3029. Add an onError method to service plugin contexts.
ALL CHANGES:
+ TEZ-3124. Running task hangs due to missing event to initialize input in recovery.
TEZ-3135. tez-ext-service-tests, tez-plugins/tez-yarn-timeline-history and tez-tools/tez-javadoc-tools missing dependencies.
TEZ-3134. tez-dag should depend on commons-collections4.
TEZ-3126. Log reason for not reducing parallelism
http://git-wip-us.apache.org/repos/asf/tez/blob/701e9aa2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index c8f217b..8b81be7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1861,12 +1861,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
void logJobHistoryVertexInitializedEvent() {
- // TODO Vertex init may happen multiple times, so it is possible to have multiple VertexInitializedEvent
- VertexInitializedEvent initEvt = new VertexInitializedEvent(vertexId, vertexName,
- initTimeRequested, initedTime, numTasks,
- getProcessorName(), getAdditionalInputs(), initGeneratedEvents);
- this.appContext.getHistoryHandler().handle(
- new DAGHistoryEvent(getDAGId(), initEvt));
+ if (recoveryData == null || !recoveryData.shouldSkipInit()) {
+ VertexInitializedEvent initEvt = new VertexInitializedEvent(vertexId, vertexName,
+ initTimeRequested, initedTime, numTasks,
+ getProcessorName(), getAdditionalInputs(), initGeneratedEvents);
+ this.appContext.getHistoryHandler().handle(
+ new DAGHistoryEvent(getDAGId(), initEvt));
+ }
}
void logJobHistoryVertexStartedEvent() {
http://git-wip-us.apache.org/repos/asf/tez/blob/701e9aa2/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
index 8a0f39e..c08780f 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
@@ -20,6 +20,8 @@ package org.apache.tez.test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.codec.binary.Base64;
@@ -164,7 +166,7 @@ public class RecoveryServiceWithEventHandlingHook extends RecoveryService {
throws IOException {
if (shutdownCondition.timing.equals(TIMING.PRE)
&& appContext.getApplicationAttemptId().getAttemptId() == 1
- && shouldShutdown(event)) {
+ && shutdownCondition.match(event.getHistoryEvent())) {
recoveryService.shutdown();
}
}
@@ -173,19 +175,11 @@ public class RecoveryServiceWithEventHandlingHook extends RecoveryService {
public void postHandleRecoveryEvent(DAGHistoryEvent event)
throws IOException {
if (shutdownCondition.timing.equals(TIMING.POST)
- && appContext.getApplicationAttemptId().getAttemptId() == 1
- && shouldShutdown(event)) {
+ && appContext.getApplicationAttemptId().getAttemptId() == 1
+ && shutdownCondition.match(event.getHistoryEvent())) {
recoveryService.shutdown();
}
}
-
- private boolean shouldShutdown(DAGHistoryEvent event) {
- // only check whether to shutdown when it is the first AM attempt
- if (appContext.getApplicationAttemptId().getAttemptId() >= 2) {
- return false;
- }
- return shutdownCondition.match(event.getHistoryEvent());
- }
@Override
public void preHandleSummaryEvent(HistoryEventType eventType,
@@ -387,4 +381,104 @@ public class RecoveryServiceWithEventHandlingHook extends RecoveryService {
return event.getEventType();
}
}
+
+ public static class MultipleRoundRecoveryEventHook extends RecoveryServiceHook {
+
+ public static final String MULTIPLE_ROUND_SHUTDOWN_CONDITION = "tez.test.recovery.multiple_round_shutdown_condition";
+ private MultipleRoundShutdownCondition shutdownCondition;
+ private int attemptId;
+
+ public MultipleRoundRecoveryEventHook(RecoveryServiceWithEventHandlingHook recoveryService, AppContext appContext) {
+ super(recoveryService, appContext);
+ this.shutdownCondition = new MultipleRoundShutdownCondition();
+ try {
+ Preconditions.checkArgument(recoveryService.getConfig().get(MULTIPLE_ROUND_SHUTDOWN_CONDITION) != null,
+ MULTIPLE_ROUND_SHUTDOWN_CONDITION + " is not set in TezConfiguration");
+ this.shutdownCondition.deserialize(recoveryService.getConfig().get(MULTIPLE_ROUND_SHUTDOWN_CONDITION));
+ } catch (IOException e) {
+ throw new TezUncheckedException("Can not initialize MultipleRoundShutdownCondition", e);
+ }
+ this.attemptId = appContext.getApplicationAttemptId().getAttemptId();
+ }
+
+ @Override
+ public void preHandleRecoveryEvent(DAGHistoryEvent event) throws IOException {
+ if (attemptId <= shutdownCondition.size()) {
+ SimpleShutdownCondition condition = shutdownCondition.getSimpleShutdownCondition(attemptId - 1);
+ if (condition.timing.equals(TIMING.PRE)
+ && condition.match(event.getHistoryEvent())) {
+ recoveryService.shutdown();
+ }
+ }
+ }
+
+ @Override
+ public void postHandleRecoveryEvent(DAGHistoryEvent event) throws IOException {
+ for (int i=0;i<shutdownCondition.size();++i) {
+ SimpleShutdownCondition condition = shutdownCondition.getSimpleShutdownCondition(i);
+ LOG.info("condition:" + condition.getEvent().getEventType() + ":" + condition.getHistoryEvent());
+ }
+ if (attemptId <= shutdownCondition.size()) {
+ SimpleShutdownCondition condition = shutdownCondition.getSimpleShutdownCondition(attemptId - 1);
+
+ LOG.info("event:" + event.getHistoryEvent().getEventType());
+ if (condition.timing.equals(TIMING.POST)
+ && condition.match(event.getHistoryEvent())) {
+ recoveryService.shutdown();
+ }
+ }
+ }
+
+ @Override
+ public void preHandleSummaryEvent(HistoryEventType eventType, SummaryEvent summaryEvent) throws IOException {
+
+ }
+
+ @Override
+ public void postHandleSummaryEvent(HistoryEventType eventType, SummaryEvent summaryEvent) throws IOException {
+
+ }
+ }
+
+ public static class MultipleRoundShutdownCondition {
+
+ private List<SimpleShutdownCondition> shutdownConditionList;
+
+ public MultipleRoundShutdownCondition() {
+
+ }
+
+ public MultipleRoundShutdownCondition(List<SimpleShutdownCondition> shutdownConditionList) {
+ this.shutdownConditionList = shutdownConditionList;
+ }
+
+ public String serialize() throws IOException {
+ StringBuilder builder = new StringBuilder();
+ for (int i=0; i< shutdownConditionList.size(); ++i) {
+ builder.append(shutdownConditionList.get(i).serialize());
+ if (i!=shutdownConditionList.size()-1) {
+ builder.append(";");
+ }
+ }
+ return builder.toString();
+ }
+
+ public MultipleRoundShutdownCondition deserialize(String str) throws IOException {
+ String[] splits = str.split(";");
+ shutdownConditionList = new ArrayList<SimpleShutdownCondition>();
+ for (String split : splits) {
+ SimpleShutdownCondition condition = new SimpleShutdownCondition();
+ shutdownConditionList.add(condition.deserialize(split));
+ }
+ return this;
+ }
+
+ public SimpleShutdownCondition getSimpleShutdownCondition(int index) {
+ return shutdownConditionList.get(index);
+ }
+
+ public int size() {
+ return shutdownConditionList.size();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/701e9aa2/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
index dc26167..3f669c6 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
@@ -301,6 +301,56 @@ public class TestRecovery {
}
+ private void testOrderedWordCountMultipleRoundRecoverying(
+ RecoveryServiceWithEventHandlingHook.MultipleRoundShutdownCondition shutdownCondition,
+ boolean enableAutoParallelism, boolean generateSplitInClient) throws Exception {
+
+ for (int i=0; i<shutdownCondition.size(); i++) {
+ SimpleShutdownCondition condition = shutdownCondition.getSimpleShutdownCondition(i);
+ LOG.info("ShutdownCondition:" + condition.getEventType()
+ + ", event=" + condition.getEvent());
+ }
+
+ String inputDirStr = "/tmp/owc-input/";
+ Path inputDir = new Path(inputDirStr);
+ Path stagingDirPath = new Path("/tmp/owc-staging-dir");
+ remoteFs.mkdirs(inputDir);
+ remoteFs.mkdirs(stagingDirPath);
+ TestTezJobs.generateOrderedWordCountInput(inputDir, remoteFs);
+
+ String outputDirStr = "/tmp/owc-output/";
+ Path outputDir = new Path(outputDirStr);
+
+ TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+ tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
+ tezConf.set(TezConfiguration.TEZ_AM_RECOVERY_SERVICE_CLASS,
+ RecoveryServiceWithEventHandlingHook.class.getName());
+ tezConf.set(
+ RecoveryServiceWithEventHandlingHook.AM_RECOVERY_SERVICE_HOOK_CLASS,
+ RecoveryServiceWithEventHandlingHook.MultipleRoundRecoveryEventHook.class.getName());
+ tezConf.set(RecoveryServiceWithEventHandlingHook.MultipleRoundRecoveryEventHook.MULTIPLE_ROUND_SHUTDOWN_CONDITION,
+ shutdownCondition.serialize());
+ tezConf.setBoolean(
+ ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
+ enableAutoParallelism);
+ tezConf.setBoolean(
+ RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, false);
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
+ tezConf.setBoolean(
+ TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, false);
+ OrderedWordCount job = new OrderedWordCount();
+ if (generateSplitInClient) {
+ Assert
+ .assertTrue("OrderedWordCount failed", job.run(tezConf, new String[]{
+ "-generateSplitInClient", inputDirStr, outputDirStr, "5"}, null) == 0);
+ } else {
+ Assert
+ .assertTrue("OrderedWordCount failed", job.run(tezConf, new String[]{
+ inputDirStr, outputDirStr, "5"}, null) == 0);
+ }
+ TestTezJobs.verifyOutput(outputDir, remoteFs);
+ }
+
@Test(timeout = 1800000)
public void testRecovery_HashJoin() throws Exception {
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
@@ -501,4 +551,77 @@ public class TestRecovery {
assertTrue(shutdownCondition.match(lastEvent));
}
+ @Test(timeout = 1800000)
+ public void testTwoRoundsRecoverying() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
+ 1);
+ TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+ TezVertexID vertexId0 = TezVertexID.getInstance(dagId, 0);
+ TezVertexID vertexId1 = TezVertexID.getInstance(dagId, 1);
+ TezVertexID vertexId2 = TezVertexID.getInstance(dagId, 2);
+ ContainerId containerId = ContainerId.newInstance(
+ ApplicationAttemptId.newInstance(appId, 1), 1);
+ NodeId nodeId = NodeId.newInstance("localhost", 10);
+ List<TezEvent> initGeneratedEvents = Lists.newArrayList(
+ new TezEvent(InputDataInformationEvent.createWithObjectPayload(0, new Object()), null));
+
+
+ List<SimpleShutdownCondition> shutdownConditions = Lists.newArrayList(
+
+ new SimpleShutdownCondition(TIMING.POST, new DAGInitializedEvent(
+ dagId, 0L, "username", "dagName", null)),
+ new SimpleShutdownCondition(TIMING.POST, new DAGStartedEvent(dagId,
+ 0L, "username", "dagName")),
+ new SimpleShutdownCondition(TIMING.POST,
+ new VertexInitializedEvent(vertexId0, "Tokenizer", 0L, 0L, 0,
+ "", null, initGeneratedEvents)),
+ new SimpleShutdownCondition(TIMING.POST, new VertexStartedEvent(
+ vertexId0, 0L, 0L)),
+ new SimpleShutdownCondition(TIMING.POST,
+ new VertexConfigurationDoneEvent(vertexId0, 0L, 2, null, null,
+ null, true)),
+ new SimpleShutdownCondition(TIMING.POST, new TaskStartedEvent(
+ TezTaskID.getInstance(vertexId0, 0), "vertexName", 0L, 0L)),
+ new SimpleShutdownCondition(TIMING.POST,
+ new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(
+ TezTaskID.getInstance(vertexId0, 0), 0), "vertexName", 0L,
+ containerId, nodeId, "", "", "")),
+ new SimpleShutdownCondition(TIMING.POST, new TaskFinishedEvent(
+ TezTaskID.getInstance(vertexId0, 0), "vertexName", 0L, 0L,
+ null, TaskState.SUCCEEDED, "", new TezCounters(), 0)),
+ new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
+ vertexId0, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
+ VertexState.SUCCEEDED, "", new TezCounters(),
+ new VertexStats(), new HashMap<String, Integer>())),
+ new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
+ vertexId1, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
+ VertexState.SUCCEEDED, "", new TezCounters(),
+ new VertexStats(), new HashMap<String, Integer>())),
+ new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
+ vertexId2, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
+ VertexState.SUCCEEDED, "", new TezCounters(),
+ new VertexStats(), new HashMap<String, Integer>())),
+ new SimpleShutdownCondition(TIMING.POST, new DAGFinishedEvent(
+ dagId, 0L, 0L, DAGState.SUCCEEDED, "", new TezCounters(),
+ "username", "dagName", new HashMap<String, Integer>(),
+ ApplicationAttemptId.newInstance(appId, 1), null))
+
+ );
+
+ Random rand = new Random();
+ for (int i = 0; i < shutdownConditions.size() - 1; i++) {
+ // randomly choose half of the test scenario to avoid
+ // timeout.
+ if (rand.nextDouble()<0.5) {
+ int nextSimpleConditionIndex = i + 1 + rand.nextInt(shutdownConditions.size() - i - 1);
+ if (nextSimpleConditionIndex == shutdownConditions.size() - 1) {
+ testOrderedWordCountMultipleRoundRecoverying(
+ new RecoveryServiceWithEventHandlingHook.MultipleRoundShutdownCondition(
+ Lists.newArrayList(shutdownConditions.get(i), shutdownConditions.get(nextSimpleConditionIndex)))
+ , true,
+ shutdownConditions.get(i).getHistoryEvent().getEventType() == HistoryEventType.VERTEX_STARTED);
+ }
+ }
+ }
+ }
}