You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by je...@apache.org on 2020/01/28 23:26:25 UTC

[hadoop] branch branch-2.10 updated: MAPREDUCE-7259. testSpeculateSuccessfulWithUpdateEvents fails Intermittently

This is an automated email from the ASF dual-hosted git repository.

jeagles pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 006b0d7  MAPREDUCE-7259. testSpeculateSuccessfulWithUpdateEvents fails Intermittently
006b0d7 is described below

commit 006b0d70a30054a172d76b5b03ce9d52fc0ff153
Author: Ahmed Hussein <ah...@apache.org>
AuthorDate: Tue Jan 28 17:26:13 2020 -0600

    MAPREDUCE-7259. testSpeculateSuccessfulWithUpdateEvents fails Intermittently
    
    Signed-off-by: Jonathan Eagles <je...@gmail.com>
---
 .../org/apache/hadoop/mapreduce/v2/app/MRApp.java  |  31 ---
 .../v2/TestSpeculativeExecutionWithMRApp.java      | 308 +++++++++------------
 2 files changed, 133 insertions(+), 206 deletions(-)

diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
index e6e971a..fd012b2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
@@ -18,18 +18,14 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
-import com.google.common.base.Joiner;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.EnumSet;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
@@ -376,33 +372,6 @@ public class MRApp extends MRAppMaster {
         report.getTaskAttemptState());
   }
 
-  public void waitForState(TaskAttempt attempt,
-      TaskAttemptState...finalStates) throws Exception {
-    int timeoutSecs = 0;
-    TaskAttemptReport report = attempt.getReport();
-    List<TaskAttemptState> targetStates =  Arrays.asList(finalStates);
-    List<String> stateValuesList = new ArrayList<>();
-    for (TaskAttemptState taState : targetStates) {
-      stateValuesList.add(taState.toString());
-    }
-    String statesValues = Joiner.on(",").join(stateValuesList);
-    while (!targetStates.contains(report.getTaskAttemptState()) &&
-        timeoutSecs++ < 20) {
-      System.out.println(
-          "TaskAttempt " + attempt.getID().toString() + "  State is : "
-              + report.getTaskAttemptState()
-              + " Waiting for states: " + statesValues
-              + ". curent state is : " + report.getTaskAttemptState()
-              + ".   progress : " + report.getProgress());
-      report = attempt.getReport();
-      Thread.sleep(500);
-    }
-    System.out.println("TaskAttempt State is : "
-        + report.getTaskAttemptState());
-    Assert.assertTrue("TaskAttempt state is not correct (timedout)",
-        targetStates.contains(report.getTaskAttemptState()));
-  }
-
   public void waitForState(Task task, TaskState finalState) throws Exception {
     int timeoutSecs = 0;
     TaskReport report = task.getReport();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
index d4d432b..5acee7a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
@@ -18,16 +18,14 @@
 
 package org.apache.hadoop.mapreduce.v2;
 
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
+import com.google.common.base.Supplier;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.SimpleExponentialTaskRuntimeEstimator;
@@ -50,18 +48,12 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.ControlledClock;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.junit.Rule;
+import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.base.Supplier;
-import org.junit.rules.TestRule;
-import org.junit.runner.Description;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.model.Statement;
 
 /**
  * The type Test speculative execution with mr app.
@@ -70,74 +62,11 @@ import org.junit.runners.model.Statement;
 @SuppressWarnings({ "unchecked", "rawtypes" })
 @RunWith(Parameterized.class)
 public class TestSpeculativeExecutionWithMRApp {
-  /** Number of times to re-try the failing tests. */
-  private static final int ASSERT_SPECULATIONS_COUNT_RETRIES = 3;
   private static final int NUM_MAPPERS = 5;
   private static final int NUM_REDUCERS = 0;
 
   /**
-   * Speculation has non-deterministic behavior due to racing and timing. Use
-   * retry to verify that junit tests can pass.
-   */
-  @Retention(RetentionPolicy.RUNTIME)
-  public @interface Retry {}
-
-  /**
-   * The type Retry rule.
-   */
-  class RetryRule implements TestRule {
-
-    private AtomicInteger retryCount;
-
-    /**
-     * Instantiates a new Retry rule.
-     *
-     * @param retries the retries
-     */
-    RetryRule(int retries) {
-      super();
-      this.retryCount = new AtomicInteger(retries);
-    }
-
-    @Override
-    public Statement apply(final Statement base,
-        final Description description) {
-      return new Statement() {
-        @Override
-        public void evaluate() throws Throwable {
-          Throwable caughtThrowable = null;
-
-          while (retryCount.getAndDecrement() > 0) {
-            try {
-              base.evaluate();
-              return;
-            } catch (Throwable t) {
-              if (retryCount.get() > 0 &&
-                  description.getAnnotation(Retry.class) != null) {
-                caughtThrowable = t;
-                System.out.println(
-                    description.getDisplayName() +
-                        ": Failed, " +
-                        retryCount.toString() +
-                        " retries remain");
-              } else {
-                throw caughtThrowable;
-              }
-            }
-          }
-        }
-      };
-    }
-  }
-
-  /**
-   * The Rule.
-   */
-  @Rule
-  public RetryRule rule = new RetryRule(ASSERT_SPECULATIONS_COUNT_RETRIES);
-
-  /**
-   * Gets test parameters.
+   * Get test parameters.
    *
    * @return the test parameters
    */
@@ -151,6 +80,7 @@ public class TestSpeculativeExecutionWithMRApp {
 
   private Class<? extends TaskRuntimeEstimator> estimatorClass;
 
+  private final ControlledClock controlledClk;
   /**
    * Instantiates a new Test speculative execution with mr app.
    *
@@ -159,6 +89,12 @@ public class TestSpeculativeExecutionWithMRApp {
   public TestSpeculativeExecutionWithMRApp(
       Class<? extends TaskRuntimeEstimator>  estimatorKlass) {
     this.estimatorClass = estimatorKlass;
+    this.controlledClk = new ControlledClock();
+  }
+
+  @Before
+  public void setup() {
+    this.controlledClk.setTime(System.currentTimeMillis());
   }
 
   /**
@@ -166,16 +102,11 @@ public class TestSpeculativeExecutionWithMRApp {
    *
    * @throws Exception the exception
    */
-  @Retry
   @Test (timeout = 360000)
   public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception {
-
-    Clock actualClock = SystemClock.getInstance();
-    final ControlledClock clock = new ControlledClock(actualClock);
-    clock.setTime(System.currentTimeMillis());
-
     MRApp app =
-        new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true, clock);
+        new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true,
+            controlledClk);
     Job job = app.submit(createConfiguration(), true, true);
     app.waitForState(job, JobState.RUNNING);
 
@@ -187,19 +118,13 @@ public class TestSpeculativeExecutionWithMRApp {
       app.waitForState(taskIter.next(), TaskState.RUNNING);
     }
 
-    // Process the update events
-    clock.setTime(System.currentTimeMillis() + 2000);
+    // Process the update events.
+    controlledClk.tickMsec(1000L);
     EventHandler appEventHandler = app.getContext().getEventHandler();
     for (Map.Entry<TaskId, Task> mapTask : tasks.entrySet()) {
       for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : mapTask
         .getValue().getAttempts().entrySet()) {
-        TaskAttemptStatus status =
-            createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.8,
-              TaskAttemptState.RUNNING);
-        TaskAttemptStatusUpdateEvent event =
-            new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
-                new AtomicReference<>(status));
-        appEventHandler.handle(event);
+        updateTaskProgress(appEventHandler, taskAttempt.getValue(), 0.8f);
       }
     }
 
@@ -210,34 +135,26 @@ public class TestSpeculativeExecutionWithMRApp {
 
     // Other than one random task, finish every other task.
     for (Map.Entry<TaskId, Task> mapTask : tasks.entrySet()) {
-      for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : mapTask
-        .getValue().getAttempts().entrySet()) {
-        if (mapTask.getKey() != taskToBeSpeculated.getID()) {
-          appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
-            TaskAttemptEventType.TA_DONE));
-          appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
-            TaskAttemptEventType.TA_CONTAINER_COMPLETED));
-          app.waitForState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED,
-              TaskAttemptState.KILLED);
+      if (mapTask.getKey() != taskToBeSpeculated.getID()) {
+        for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : mapTask
+            .getValue().getAttempts().entrySet()) {
+          TaskAttemptId taId = taskAttempt.getKey();
+          if (taId.getId() > 0) {
+            // in case the speculator started a speculative TA, then skip it.
+            continue;
+          }
+          markTACompleted(appEventHandler, taskAttempt.getValue());
+          waitForTAState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED,
+              controlledClk);
         }
       }
     }
-
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        if (taskToBeSpeculated.getAttempts().size() != 2) {
-          clock.setTime(System.currentTimeMillis() + 1000);
-          return false;
-        } else {
-          return true;
-        }
-      }
-    }, 1000, 60000);
+    controlledClk.tickMsec(2000L);
+    waitForSpeculation(taskToBeSpeculated, controlledClk);
     // finish 1st TA, 2nd will be killed
     TaskAttempt[] ta = makeFirstAttemptWin(appEventHandler, taskToBeSpeculated);
-    verifySpeculationMessage(app, ta);
-    app.waitForState(Service.STATE.STOPPED);
+    waitForTAState(ta[0], TaskAttemptState.SUCCEEDED, controlledClk);
+    waitForAppStop(app, controlledClk);
   }
 
   /**
@@ -245,16 +162,11 @@ public class TestSpeculativeExecutionWithMRApp {
    *
    * @throws Exception the exception
    */
-  @Retry
   @Test (timeout = 360000)
   public void testSpeculateSuccessfulWithUpdateEvents() throws Exception {
-
-    Clock actualClock = SystemClock.getInstance();
-    final ControlledClock clock = new ControlledClock(actualClock);
-    clock.setTime(System.currentTimeMillis());
-
     MRApp app =
-        new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true, clock);
+        new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true,
+            controlledClk);
     Job job = app.submit(createConfiguration(), true, true);
     app.waitForState(job, JobState.RUNNING);
 
@@ -266,103 +178,77 @@ public class TestSpeculativeExecutionWithMRApp {
       app.waitForState(taskIter.next(), TaskState.RUNNING);
     }
 
-    // Process the update events
-    clock.setTime(System.currentTimeMillis() + 1000);
+    // process the update events. Note that we should avoid advancing the clock
+    // by a value that triggers a speculation scan while updating the task
+    // progress, because the speculator may concurrently speculate tasks before
+    // we update their progress.
+    controlledClk.tickMsec(2000L);
     EventHandler appEventHandler = app.getContext().getEventHandler();
     for (Map.Entry<TaskId, Task> mapTask : tasks.entrySet()) {
       for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : mapTask
         .getValue().getAttempts().entrySet()) {
-        TaskAttemptStatus status =
-            createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.5,
-              TaskAttemptState.RUNNING);
-        TaskAttemptStatusUpdateEvent event =
-            new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
-                new AtomicReference<>(status));
-        appEventHandler.handle(event);
+        updateTaskProgress(appEventHandler, taskAttempt.getValue(), 0.5f);
       }
     }
 
     Task speculatedTask = null;
     int numTasksToFinish = NUM_MAPPERS + NUM_REDUCERS - 1;
-    clock.setTime(System.currentTimeMillis() + 1000);
+    controlledClk.tickMsec(1000L);
     for (Map.Entry<TaskId, Task> task : tasks.entrySet()) {
       for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : task.getValue()
         .getAttempts().entrySet()) {
-        if (numTasksToFinish > 0) {
-          appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
-            TaskAttemptEventType.TA_DONE));
-          appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
-            TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+        TaskAttemptId taId = taskAttempt.getKey();
+        if (numTasksToFinish > 0 && taId.getId() == 0) {
+          // Skip speculative attempts if any.
+          markTACompleted(appEventHandler, taskAttempt.getValue());
           numTasksToFinish--;
-          app.waitForState(taskAttempt.getValue(), TaskAttemptState.KILLED,
-              TaskAttemptState.SUCCEEDED);
+          waitForTAState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED,
+              controlledClk);
         } else {
           // The last task is chosen for speculation
-          TaskAttemptStatus status =
-              createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.75,
-                TaskAttemptState.RUNNING);
           speculatedTask = task.getValue();
-          TaskAttemptStatusUpdateEvent event =
-              new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
-                  new AtomicReference<>(status));
-          appEventHandler.handle(event);
+          updateTaskProgress(appEventHandler, taskAttempt.getValue(), 0.75f);
         }
       }
     }
 
-    clock.setTime(System.currentTimeMillis() + 15000);
+    controlledClk.tickMsec(15000L);
 
     for (Map.Entry<TaskId, Task> task : tasks.entrySet()) {
       for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : task.getValue()
         .getAttempts().entrySet()) {
+        // Skip task attempts that are finished or killed.
         if (!(taskAttempt.getValue().getState() == TaskAttemptState.SUCCEEDED
             || taskAttempt.getValue().getState() == TaskAttemptState.KILLED)) {
-          TaskAttemptStatus status =
-              createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.75,
-                TaskAttemptState.RUNNING);
-          TaskAttemptStatusUpdateEvent event =
-              new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
-                  new AtomicReference<>(status));
-          appEventHandler.handle(event);
+          updateTaskProgress(appEventHandler, taskAttempt.getValue(), 0.75f);
         }
       }
     }
 
     final Task speculatedTaskConst = speculatedTask;
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        if (speculatedTaskConst.getAttempts().size() != 2) {
-          clock.setTime(System.currentTimeMillis() + 1000);
-          return false;
-        } else {
-          return true;
-        }
-      }
-    }, 1000, 60000);
+    waitForSpeculation(speculatedTaskConst, controlledClk);
+
     TaskAttempt[] ta = makeFirstAttemptWin(appEventHandler, speculatedTask);
-    verifySpeculationMessage(app, ta);
-    app.waitForState(Service.STATE.STOPPED);
+    waitForTAState(ta[0], TaskAttemptState.SUCCEEDED, controlledClk);
+    waitForAppStop(app, controlledClk);
   }
 
   private static TaskAttempt[] makeFirstAttemptWin(
       EventHandler appEventHandler, Task speculatedTask) {
-
     // finish 1st TA, 2nd will be killed
     Collection<TaskAttempt> attempts = speculatedTask.getAttempts().values();
     TaskAttempt[] ta = new TaskAttempt[attempts.size()];
     attempts.toArray(ta);
-    appEventHandler.handle(
-        new TaskAttemptEvent(ta[0].getID(), TaskAttemptEventType.TA_DONE));
-    appEventHandler.handle(new TaskAttemptEvent(ta[0].getID(),
-        TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+    markTACompleted(appEventHandler, ta[0]);
     return ta;
   }
 
-  private static void verifySpeculationMessage(MRApp app, TaskAttempt[] ta)
-      throws Exception {
-    app.waitForState(ta[0], TaskAttemptState.SUCCEEDED);
-    // The speculative attempt may be not killed before the MR job succeeds.
+  private static void markTACompleted(
+      EventHandler appEventHandler, TaskAttempt attempt) {
+    appEventHandler.handle(
+        new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_DONE));
+    appEventHandler.handle(new TaskAttemptEvent(attempt.getID(),
+        TaskAttemptEventType.TA_CONTAINER_COMPLETED));
   }
 
   private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId id,
@@ -387,6 +273,78 @@ public class TestSpeculativeExecutionWithMRApp {
           MRJobConfig.MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_LAMBDA_MS,
           1000L * 10);
     }
+    conf.setLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_NO_SPECULATE,
+        3000L);
     return conf;
   }
+
+  /**
+   * Wait for MRapp to stop while incrementing the controlled clock.
+   * @param app the MRApp to be stopped.
+   * @param cClock the controlled clock of the test.
+   * @throws TimeoutException
+   * @throws InterruptedException
+   */
+  private void waitForAppStop(final MRApp app, final ControlledClock cClock)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        if (app.getServiceState()
+            != Service.STATE.STOPPED) {
+          cClock.tickMsec(250L);
+          return false;
+        }
+        return true;
+      }
+    }, 250, 60000);
+  }
+
+  /**
+   * Wait for the task to trigger a new speculation.
+   * @param speculatedTask the task we are monitoring.
+   * @param cClock the controlled clock of the test.
+   * @throws TimeoutException
+   * @throws InterruptedException
+   */
+  private void waitForSpeculation(final Task speculatedTask,
+      final ControlledClock cClock)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        if (speculatedTask.getAttempts().size() != 2) {
+          cClock.tickMsec(250L);
+          return false;
+        }
+        return true;
+      }
+    }, 250, 60000);
+  }
+
+  public void waitForTAState(final TaskAttempt attempt,
+      final TaskAttemptState finalState, final ControlledClock cClock)
+      throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        if (attempt.getReport().getTaskAttemptState() != finalState) {
+          cClock.tickMsec(250L);
+          return false;
+        }
+        return true;
+      }
+    }, 250, 10000);
+  }
+
+  private void updateTaskProgress(EventHandler appEventHandler,
+      TaskAttempt attempt, float newProgress) {
+    TaskAttemptStatus status =
+        createTaskAttemptStatus(attempt.getID(), newProgress,
+            TaskAttemptState.RUNNING);
+    TaskAttemptStatusUpdateEvent event =
+        new TaskAttemptStatusUpdateEvent(attempt.getID(),
+            new AtomicReference<>(status));
+    appEventHandler.handle(event);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org