You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by sr...@apache.org on 2016/02/24 19:51:42 UTC

[8/9] tez git commit: TEZ-3124. Running task hangs due to missing event to initialize input in recovery (zjffdu)

TEZ-3124. Running task hangs due to missing event to initialize input in recovery (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/701e9aa2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/701e9aa2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/701e9aa2

Branch: refs/heads/TEZ-2980
Commit: 701e9aa2ce04585be657bc8e1c3eb17317afad6b
Parents: 7fc28f7
Author: Jeff Zhang <zj...@apache.org>
Authored: Wed Feb 24 16:03:30 2016 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Wed Feb 24 16:05:01 2016 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  13 +-
 .../RecoveryServiceWithEventHandlingHook.java   | 116 +++++++++++++++--
 .../java/org/apache/tez/test/TestRecovery.java  | 123 +++++++++++++++++++
 4 files changed, 236 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/701e9aa2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5dc8976..fb0e8b6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-3029. Add an onError method to service plugin contexts.
 
 ALL CHANGES:
+  TEZ-3124. Running task hangs due to missing event to initialize input in recovery.
   TEZ-3135. tez-ext-service-tests, tez-plugins/tez-yarn-timeline-history and tez-tools/tez-javadoc-tools missing dependencies.
   TEZ-3134. tez-dag should depend on commons-collections4.
   TEZ-3126. Log reason for not reducing parallelism

http://git-wip-us.apache.org/repos/asf/tez/blob/701e9aa2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index c8f217b..8b81be7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1861,12 +1861,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
 
   void logJobHistoryVertexInitializedEvent() {
-    // TODO Vertex init may happen multiple times, so it is possible to have multiple VertexInitializedEvent
-    VertexInitializedEvent initEvt = new VertexInitializedEvent(vertexId, vertexName,
-        initTimeRequested, initedTime, numTasks,
-        getProcessorName(), getAdditionalInputs(), initGeneratedEvents);
-    this.appContext.getHistoryHandler().handle(
-        new DAGHistoryEvent(getDAGId(), initEvt));
+    if (recoveryData == null || !recoveryData.shouldSkipInit()) {
+      VertexInitializedEvent initEvt = new VertexInitializedEvent(vertexId, vertexName,
+              initTimeRequested, initedTime, numTasks,
+              getProcessorName(), getAdditionalInputs(), initGeneratedEvents);
+      this.appContext.getHistoryHandler().handle(
+              new DAGHistoryEvent(getDAGId(), initEvt));
+    }
   }
 
   void logJobHistoryVertexStartedEvent() {

http://git-wip-us.apache.org/repos/asf/tez/blob/701e9aa2/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
index 8a0f39e..c08780f 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
@@ -20,6 +20,8 @@ package org.apache.tez.test;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.codec.binary.Base64;
@@ -164,7 +166,7 @@ public class RecoveryServiceWithEventHandlingHook extends RecoveryService {
         throws IOException {
       if (shutdownCondition.timing.equals(TIMING.PRE)
           && appContext.getApplicationAttemptId().getAttemptId() == 1
-          && shouldShutdown(event)) {
+          && shutdownCondition.match(event.getHistoryEvent())) {
         recoveryService.shutdown();
       }
     }
@@ -173,19 +175,11 @@ public class RecoveryServiceWithEventHandlingHook extends RecoveryService {
     public void postHandleRecoveryEvent(DAGHistoryEvent event)
         throws IOException {
       if (shutdownCondition.timing.equals(TIMING.POST)
-          && appContext.getApplicationAttemptId().getAttemptId() == 1
-          && shouldShutdown(event)) {
+         && appContext.getApplicationAttemptId().getAttemptId() == 1
+         && shutdownCondition.match(event.getHistoryEvent())) {
         recoveryService.shutdown();
       }
     }
-
-    private boolean shouldShutdown(DAGHistoryEvent event) {
-      // only check whether to shutdown when it is the first AM attempt
-      if (appContext.getApplicationAttemptId().getAttemptId() >= 2) {
-        return false;
-      }
-      return shutdownCondition.match(event.getHistoryEvent());
-    }
  
     @Override
     public void preHandleSummaryEvent(HistoryEventType eventType,
@@ -387,4 +381,104 @@ public class RecoveryServiceWithEventHandlingHook extends RecoveryService {
       return event.getEventType();
     }
   }
+
+  public static class MultipleRoundRecoveryEventHook extends RecoveryServiceHook {
+
+    public static final String MULTIPLE_ROUND_SHUTDOWN_CONDITION = "tez.test.recovery.multiple_round_shutdown_condition";
+    private MultipleRoundShutdownCondition shutdownCondition;
+    private int attemptId;
+
+    public MultipleRoundRecoveryEventHook(RecoveryServiceWithEventHandlingHook recoveryService, AppContext appContext) {
+      super(recoveryService, appContext);
+      this.shutdownCondition = new MultipleRoundShutdownCondition();
+      try {
+        Preconditions.checkArgument(recoveryService.getConfig().get(MULTIPLE_ROUND_SHUTDOWN_CONDITION) != null,
+                MULTIPLE_ROUND_SHUTDOWN_CONDITION + " is not set in TezConfiguration");
+        this.shutdownCondition.deserialize(recoveryService.getConfig().get(MULTIPLE_ROUND_SHUTDOWN_CONDITION));
+      } catch (IOException e) {
+        throw new TezUncheckedException("Can not initialize MultipleRoundShutdownCondition", e);
+      }
+      this.attemptId = appContext.getApplicationAttemptId().getAttemptId();
+    }
+
+    @Override
+    public void preHandleRecoveryEvent(DAGHistoryEvent event) throws IOException {
+      if (attemptId <= shutdownCondition.size()) {
+        SimpleShutdownCondition condition = shutdownCondition.getSimpleShutdownCondition(attemptId - 1);
+        if (condition.timing.equals(TIMING.PRE)
+                && condition.match(event.getHistoryEvent())) {
+          recoveryService.shutdown();
+        }
+      }
+    }
+
+    @Override
+    public void postHandleRecoveryEvent(DAGHistoryEvent event) throws IOException {
+      for (int i=0;i<shutdownCondition.size();++i) {
+        SimpleShutdownCondition condition = shutdownCondition.getSimpleShutdownCondition(i);
+        LOG.info("condition:" + condition.getEvent().getEventType() + ":" + condition.getHistoryEvent());
+      }
+      if (attemptId <= shutdownCondition.size()) {
+        SimpleShutdownCondition condition = shutdownCondition.getSimpleShutdownCondition(attemptId - 1);
+
+        LOG.info("event:" + event.getHistoryEvent().getEventType());
+        if (condition.timing.equals(TIMING.POST)
+                && condition.match(event.getHistoryEvent())) {
+          recoveryService.shutdown();
+        }
+      }
+    }
+
+    @Override
+    public void preHandleSummaryEvent(HistoryEventType eventType, SummaryEvent summaryEvent) throws IOException {
+
+    }
+
+    @Override
+    public void postHandleSummaryEvent(HistoryEventType eventType, SummaryEvent summaryEvent) throws IOException {
+
+    }
+  }
+
+  public static class MultipleRoundShutdownCondition {
+
+    private List<SimpleShutdownCondition> shutdownConditionList;
+
+    public MultipleRoundShutdownCondition() {
+
+    }
+
+    public MultipleRoundShutdownCondition(List<SimpleShutdownCondition> shutdownConditionList) {
+      this.shutdownConditionList = shutdownConditionList;
+    }
+
+    public String serialize() throws IOException {
+      StringBuilder builder = new StringBuilder();
+      for (int i=0; i< shutdownConditionList.size(); ++i) {
+        builder.append(shutdownConditionList.get(i).serialize());
+        if (i!=shutdownConditionList.size()-1) {
+          builder.append(";");
+        }
+      }
+      return builder.toString();
+    }
+
+    public MultipleRoundShutdownCondition deserialize(String str) throws IOException {
+      String[] splits = str.split(";");
+      shutdownConditionList = new ArrayList<SimpleShutdownCondition>();
+      for (String split : splits) {
+        SimpleShutdownCondition condition = new SimpleShutdownCondition();
+        shutdownConditionList.add(condition.deserialize(split));
+      }
+      return this;
+    }
+
+    public SimpleShutdownCondition getSimpleShutdownCondition(int index) {
+      return shutdownConditionList.get(index);
+    }
+
+    public int size() {
+      return shutdownConditionList.size();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/701e9aa2/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
index dc26167..3f669c6 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
@@ -301,6 +301,56 @@ public class TestRecovery {
 
   }
 
+  private void testOrderedWordCountMultipleRoundRecoverying(
+          RecoveryServiceWithEventHandlingHook.MultipleRoundShutdownCondition shutdownCondition,
+          boolean enableAutoParallelism, boolean generateSplitInClient) throws Exception {
+
+    for (int i=0; i<shutdownCondition.size(); i++) {
+      SimpleShutdownCondition condition = shutdownCondition.getSimpleShutdownCondition(i);
+      LOG.info("ShutdownCondition:" + condition.getEventType()
+              + ", event=" + condition.getEvent());
+    }
+
+    String inputDirStr = "/tmp/owc-input/";
+    Path inputDir = new Path(inputDirStr);
+    Path stagingDirPath = new Path("/tmp/owc-staging-dir");
+    remoteFs.mkdirs(inputDir);
+    remoteFs.mkdirs(stagingDirPath);
+    TestTezJobs.generateOrderedWordCountInput(inputDir, remoteFs);
+
+    String outputDirStr = "/tmp/owc-output/";
+    Path outputDir = new Path(outputDirStr);
+
+    TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+    tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
+    tezConf.set(TezConfiguration.TEZ_AM_RECOVERY_SERVICE_CLASS,
+            RecoveryServiceWithEventHandlingHook.class.getName());
+    tezConf.set(
+            RecoveryServiceWithEventHandlingHook.AM_RECOVERY_SERVICE_HOOK_CLASS,
+            RecoveryServiceWithEventHandlingHook.MultipleRoundRecoveryEventHook.class.getName());
+    tezConf.set(RecoveryServiceWithEventHandlingHook.MultipleRoundRecoveryEventHook.MULTIPLE_ROUND_SHUTDOWN_CONDITION,
+            shutdownCondition.serialize());
+    tezConf.setBoolean(
+            ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
+            enableAutoParallelism);
+    tezConf.setBoolean(
+            RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, false);
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
+    tezConf.setBoolean(
+            TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, false);
+    OrderedWordCount job = new OrderedWordCount();
+    if (generateSplitInClient) {
+      Assert
+              .assertTrue("OrderedWordCount failed", job.run(tezConf, new String[]{
+                      "-generateSplitInClient", inputDirStr, outputDirStr, "5"}, null) == 0);
+    } else {
+      Assert
+              .assertTrue("OrderedWordCount failed", job.run(tezConf, new String[]{
+                      inputDirStr, outputDirStr, "5"}, null) == 0);
+    }
+    TestTezJobs.verifyOutput(outputDir, remoteFs);
+  }
+
   @Test(timeout = 1800000)
   public void testRecovery_HashJoin() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
@@ -501,4 +551,77 @@ public class TestRecovery {
     assertTrue(shutdownCondition.match(lastEvent));
   }
 
+  @Test(timeout = 1800000)
+  public void testTwoRoundsRecoverying() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
+            1);
+    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexId0 = TezVertexID.getInstance(dagId, 0);
+    TezVertexID vertexId1 = TezVertexID.getInstance(dagId, 1);
+    TezVertexID vertexId2 = TezVertexID.getInstance(dagId, 2);
+    ContainerId containerId = ContainerId.newInstance(
+            ApplicationAttemptId.newInstance(appId, 1), 1);
+    NodeId nodeId = NodeId.newInstance("localhost", 10);
+    List<TezEvent> initGeneratedEvents = Lists.newArrayList(
+            new TezEvent(InputDataInformationEvent.createWithObjectPayload(0, new Object()), null));
+
+
+    List<SimpleShutdownCondition> shutdownConditions = Lists.newArrayList(
+
+            new SimpleShutdownCondition(TIMING.POST, new DAGInitializedEvent(
+                    dagId, 0L, "username", "dagName", null)),
+            new SimpleShutdownCondition(TIMING.POST, new DAGStartedEvent(dagId,
+                    0L, "username", "dagName")),
+            new SimpleShutdownCondition(TIMING.POST,
+                    new VertexInitializedEvent(vertexId0, "Tokenizer", 0L, 0L, 0,
+                            "", null, initGeneratedEvents)),
+            new SimpleShutdownCondition(TIMING.POST, new VertexStartedEvent(
+                    vertexId0, 0L, 0L)),
+            new SimpleShutdownCondition(TIMING.POST,
+                    new VertexConfigurationDoneEvent(vertexId0, 0L, 2, null, null,
+                            null, true)),
+            new SimpleShutdownCondition(TIMING.POST, new TaskStartedEvent(
+                    TezTaskID.getInstance(vertexId0, 0), "vertexName", 0L, 0L)),
+            new SimpleShutdownCondition(TIMING.POST,
+                    new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(
+                            TezTaskID.getInstance(vertexId0, 0), 0), "vertexName", 0L,
+                            containerId, nodeId, "", "", "")),
+            new SimpleShutdownCondition(TIMING.POST, new TaskFinishedEvent(
+                    TezTaskID.getInstance(vertexId0, 0), "vertexName", 0L, 0L,
+                    null, TaskState.SUCCEEDED, "", new TezCounters(), 0)),
+            new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
+                    vertexId0, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
+                    VertexState.SUCCEEDED, "", new TezCounters(),
+                    new VertexStats(), new HashMap<String, Integer>())),
+            new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
+                    vertexId1, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
+                    VertexState.SUCCEEDED, "", new TezCounters(),
+                    new VertexStats(), new HashMap<String, Integer>())),
+            new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
+                    vertexId2, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
+                    VertexState.SUCCEEDED, "", new TezCounters(),
+                    new VertexStats(), new HashMap<String, Integer>())),
+            new SimpleShutdownCondition(TIMING.POST, new DAGFinishedEvent(
+                    dagId, 0L, 0L, DAGState.SUCCEEDED, "", new TezCounters(),
+                    "username", "dagName", new HashMap<String, Integer>(),
+                    ApplicationAttemptId.newInstance(appId, 1), null))
+
+    );
+
+    Random rand = new Random();
+    for (int i = 0; i < shutdownConditions.size() - 1; i++) {
+      // randomly choose half of the test scenario to avoid
+      // timeout.
+      if (rand.nextDouble()<0.5) {
+        int nextSimpleConditionIndex = i + 1 + rand.nextInt(shutdownConditions.size() - i - 1);
+        if (nextSimpleConditionIndex == shutdownConditions.size() - 1) {
+          testOrderedWordCountMultipleRoundRecoverying(
+                  new RecoveryServiceWithEventHandlingHook.MultipleRoundShutdownCondition(
+                          Lists.newArrayList(shutdownConditions.get(i), shutdownConditions.get(nextSimpleConditionIndex)))
+                  , true,
+                  shutdownConditions.get(i).getHistoryEvent().getEventType() == HistoryEventType.VERTEX_STARTED);
+        }
+      }
+    }
+  }
 }