You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by je...@apache.org on 2020/01/28 17:04:02 UTC
[hadoop] branch branch-3.2 updated: MAPREDUCE-7259.
testSpeculateSuccessfulWithUpdateEvents fails Intermittently
This is an automated email from the ASF dual-hosted git repository.
jeagles pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 96caaf7 MAPREDUCE-7259. testSpeculateSuccessfulWithUpdateEvents fails Intermittently
96caaf7 is described below
commit 96caaf718508211bc8148d9c5c64b9f3c77da1b9
Author: Ahmed Hussein <ah...@apache.org>
AuthorDate: Tue Jan 28 10:57:33 2020 -0600
MAPREDUCE-7259. testSpeculateSuccessfulWithUpdateEvents fails Intermittently
Signed-off-by: Jonathan Eagles <je...@gmail.com>
(cherry picked from commit 08251538fe2550d9dd86f9daf79994f5b8bdf7fa)
---
.../org/apache/hadoop/mapreduce/v2/app/MRApp.java | 27 --
.../v2/TestSpeculativeExecutionWithMRApp.java | 298 +++++++++------------
2 files changed, 123 insertions(+), 202 deletions(-)
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
index 9a8280b..4be80c4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
@@ -22,11 +22,8 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Arrays;
import java.util.EnumSet;
-import java.util.List;
-import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
@@ -375,30 +372,6 @@ public class MRApp extends MRAppMaster {
report.getTaskAttemptState());
}
- public void waitForState(TaskAttempt attempt,
- TaskAttemptState...finalStates) throws Exception {
- int timeoutSecs = 0;
- TaskAttemptReport report = attempt.getReport();
- List<TaskAttemptState> targetStates = Arrays.asList(finalStates);
- String statesValues = targetStates.stream().map(Object::toString).collect(
- Collectors.joining(","));
- while (!targetStates.contains(report.getTaskAttemptState()) &&
- timeoutSecs++ < 20) {
- System.out.println(
- "TaskAttempt " + attempt.getID().toString() + " State is : "
- + report.getTaskAttemptState()
- + " Waiting for states: " + statesValues
- + ". curent state is : " + report.getTaskAttemptState()
- + ". progress : " + report.getProgress());
- report = attempt.getReport();
- Thread.sleep(500);
- }
- System.out.println("TaskAttempt State is : "
- + report.getTaskAttemptState());
- Assert.assertTrue("TaskAttempt state is not correct (timedout)",
- targetStates.contains(report.getTaskAttemptState()));
- }
-
public void waitForState(Task task, TaskState finalState) throws Exception {
int timeoutSecs = 0;
TaskReport report = task.getReport();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
index d4d432b..2163d7b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
@@ -18,16 +18,13 @@
package org.apache.hadoop.mapreduce.v2;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
import org.apache.hadoop.mapreduce.v2.app.speculate.SimpleExponentialTaskRuntimeEstimator;
@@ -50,18 +47,12 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent
import org.apache.hadoop.service.Service;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.junit.Rule;
+import org.junit.Before;
import org.junit.Test;
-import com.google.common.base.Supplier;
-import org.junit.rules.TestRule;
-import org.junit.runner.Description;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.junit.runners.model.Statement;
/**
* The type Test speculative execution with mr app.
@@ -70,74 +61,11 @@ import org.junit.runners.model.Statement;
@SuppressWarnings({ "unchecked", "rawtypes" })
@RunWith(Parameterized.class)
public class TestSpeculativeExecutionWithMRApp {
- /** Number of times to re-try the failing tests. */
- private static final int ASSERT_SPECULATIONS_COUNT_RETRIES = 3;
private static final int NUM_MAPPERS = 5;
private static final int NUM_REDUCERS = 0;
/**
- * Speculation has non-deterministic behavior due to racing and timing. Use
- * retry to verify that junit tests can pass.
- */
- @Retention(RetentionPolicy.RUNTIME)
- public @interface Retry {}
-
- /**
- * The type Retry rule.
- */
- class RetryRule implements TestRule {
-
- private AtomicInteger retryCount;
-
- /**
- * Instantiates a new Retry rule.
- *
- * @param retries the retries
- */
- RetryRule(int retries) {
- super();
- this.retryCount = new AtomicInteger(retries);
- }
-
- @Override
- public Statement apply(final Statement base,
- final Description description) {
- return new Statement() {
- @Override
- public void evaluate() throws Throwable {
- Throwable caughtThrowable = null;
-
- while (retryCount.getAndDecrement() > 0) {
- try {
- base.evaluate();
- return;
- } catch (Throwable t) {
- if (retryCount.get() > 0 &&
- description.getAnnotation(Retry.class) != null) {
- caughtThrowable = t;
- System.out.println(
- description.getDisplayName() +
- ": Failed, " +
- retryCount.toString() +
- " retries remain");
- } else {
- throw caughtThrowable;
- }
- }
- }
- }
- };
- }
- }
-
- /**
- * The Rule.
- */
- @Rule
- public RetryRule rule = new RetryRule(ASSERT_SPECULATIONS_COUNT_RETRIES);
-
- /**
- * Gets test parameters.
+ * Get test parameters.
*
* @return the test parameters
*/
@@ -151,6 +79,7 @@ public class TestSpeculativeExecutionWithMRApp {
private Class<? extends TaskRuntimeEstimator> estimatorClass;
+ private final ControlledClock controlledClk;
/**
* Instantiates a new Test speculative execution with mr app.
*
@@ -159,6 +88,12 @@ public class TestSpeculativeExecutionWithMRApp {
public TestSpeculativeExecutionWithMRApp(
Class<? extends TaskRuntimeEstimator> estimatorKlass) {
this.estimatorClass = estimatorKlass;
+ this.controlledClk = new ControlledClock();
+ }
+
+ @Before
+ public void setup() {
+ this.controlledClk.setTime(System.currentTimeMillis());
}
/**
@@ -166,16 +101,11 @@ public class TestSpeculativeExecutionWithMRApp {
*
* @throws Exception the exception
*/
- @Retry
@Test (timeout = 360000)
public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception {
-
- Clock actualClock = SystemClock.getInstance();
- final ControlledClock clock = new ControlledClock(actualClock);
- clock.setTime(System.currentTimeMillis());
-
MRApp app =
- new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true, clock);
+ new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true,
+ controlledClk);
Job job = app.submit(createConfiguration(), true, true);
app.waitForState(job, JobState.RUNNING);
@@ -187,19 +117,13 @@ public class TestSpeculativeExecutionWithMRApp {
app.waitForState(taskIter.next(), TaskState.RUNNING);
}
- // Process the update events
- clock.setTime(System.currentTimeMillis() + 2000);
+ // Process the update events.
+ controlledClk.tickMsec(1000L);
EventHandler appEventHandler = app.getContext().getEventHandler();
for (Map.Entry<TaskId, Task> mapTask : tasks.entrySet()) {
for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : mapTask
.getValue().getAttempts().entrySet()) {
- TaskAttemptStatus status =
- createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.8,
- TaskAttemptState.RUNNING);
- TaskAttemptStatusUpdateEvent event =
- new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
- new AtomicReference<>(status));
- appEventHandler.handle(event);
+ updateTaskProgress(appEventHandler, taskAttempt.getValue(), 0.8f);
}
}
@@ -210,34 +134,26 @@ public class TestSpeculativeExecutionWithMRApp {
// Other than one random task, finish every other task.
for (Map.Entry<TaskId, Task> mapTask : tasks.entrySet()) {
- for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : mapTask
- .getValue().getAttempts().entrySet()) {
- if (mapTask.getKey() != taskToBeSpeculated.getID()) {
- appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
- TaskAttemptEventType.TA_DONE));
- appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
- TaskAttemptEventType.TA_CONTAINER_COMPLETED));
- app.waitForState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED,
- TaskAttemptState.KILLED);
+ if (mapTask.getKey() != taskToBeSpeculated.getID()) {
+ for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : mapTask
+ .getValue().getAttempts().entrySet()) {
+ TaskAttemptId taId = taskAttempt.getKey();
+ if (taId.getId() > 0) {
+ // in case the speculator started a speculative TA, then skip it.
+ continue;
+ }
+ markTACompleted(appEventHandler, taskAttempt.getValue());
+ waitForTAState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED,
+ controlledClk);
}
}
}
-
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- if (taskToBeSpeculated.getAttempts().size() != 2) {
- clock.setTime(System.currentTimeMillis() + 1000);
- return false;
- } else {
- return true;
- }
- }
- }, 1000, 60000);
+ controlledClk.tickMsec(2000L);
+ waitForSpeculation(taskToBeSpeculated, controlledClk);
// finish 1st TA, 2nd will be killed
TaskAttempt[] ta = makeFirstAttemptWin(appEventHandler, taskToBeSpeculated);
- verifySpeculationMessage(app, ta);
- app.waitForState(Service.STATE.STOPPED);
+ waitForTAState(ta[0], TaskAttemptState.SUCCEEDED, controlledClk);
+ waitForAppStop(app, controlledClk);
}
/**
@@ -245,16 +161,11 @@ public class TestSpeculativeExecutionWithMRApp {
*
* @throws Exception the exception
*/
- @Retry
@Test (timeout = 360000)
public void testSpeculateSuccessfulWithUpdateEvents() throws Exception {
-
- Clock actualClock = SystemClock.getInstance();
- final ControlledClock clock = new ControlledClock(actualClock);
- clock.setTime(System.currentTimeMillis());
-
MRApp app =
- new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true, clock);
+ new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true,
+ controlledClk);
Job job = app.submit(createConfiguration(), true, true);
app.waitForState(job, JobState.RUNNING);
@@ -266,103 +177,77 @@ public class TestSpeculativeExecutionWithMRApp {
app.waitForState(taskIter.next(), TaskState.RUNNING);
}
- // Process the update events
- clock.setTime(System.currentTimeMillis() + 1000);
+ // process the update events. Note that we should avoid advancing the clock
+ // by a value that triggers a speculation scan while updating the task
+ // progress, because the speculator may concurrently speculate tasks before
+ // we update their progress.
+ controlledClk.tickMsec(2000L);
EventHandler appEventHandler = app.getContext().getEventHandler();
for (Map.Entry<TaskId, Task> mapTask : tasks.entrySet()) {
for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : mapTask
.getValue().getAttempts().entrySet()) {
- TaskAttemptStatus status =
- createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.5,
- TaskAttemptState.RUNNING);
- TaskAttemptStatusUpdateEvent event =
- new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
- new AtomicReference<>(status));
- appEventHandler.handle(event);
+ updateTaskProgress(appEventHandler, taskAttempt.getValue(), 0.5f);
}
}
Task speculatedTask = null;
int numTasksToFinish = NUM_MAPPERS + NUM_REDUCERS - 1;
- clock.setTime(System.currentTimeMillis() + 1000);
+ controlledClk.tickMsec(1000L);
for (Map.Entry<TaskId, Task> task : tasks.entrySet()) {
for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : task.getValue()
.getAttempts().entrySet()) {
- if (numTasksToFinish > 0) {
- appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
- TaskAttemptEventType.TA_DONE));
- appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
- TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+ TaskAttemptId taId = taskAttempt.getKey();
+ if (numTasksToFinish > 0 && taId.getId() == 0) {
+ // Skip speculative attempts if any.
+ markTACompleted(appEventHandler, taskAttempt.getValue());
numTasksToFinish--;
- app.waitForState(taskAttempt.getValue(), TaskAttemptState.KILLED,
- TaskAttemptState.SUCCEEDED);
+ waitForTAState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED,
+ controlledClk);
} else {
// The last task is chosen for speculation
- TaskAttemptStatus status =
- createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.75,
- TaskAttemptState.RUNNING);
speculatedTask = task.getValue();
- TaskAttemptStatusUpdateEvent event =
- new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
- new AtomicReference<>(status));
- appEventHandler.handle(event);
+ updateTaskProgress(appEventHandler, taskAttempt.getValue(), 0.75f);
}
}
}
- clock.setTime(System.currentTimeMillis() + 15000);
+ controlledClk.tickMsec(15000L);
for (Map.Entry<TaskId, Task> task : tasks.entrySet()) {
for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : task.getValue()
.getAttempts().entrySet()) {
+ // Skip task attempts that are finished or killed.
if (!(taskAttempt.getValue().getState() == TaskAttemptState.SUCCEEDED
|| taskAttempt.getValue().getState() == TaskAttemptState.KILLED)) {
- TaskAttemptStatus status =
- createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.75,
- TaskAttemptState.RUNNING);
- TaskAttemptStatusUpdateEvent event =
- new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
- new AtomicReference<>(status));
- appEventHandler.handle(event);
+ updateTaskProgress(appEventHandler, taskAttempt.getValue(), 0.75f);
}
}
}
final Task speculatedTaskConst = speculatedTask;
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- if (speculatedTaskConst.getAttempts().size() != 2) {
- clock.setTime(System.currentTimeMillis() + 1000);
- return false;
- } else {
- return true;
- }
- }
- }, 1000, 60000);
+ waitForSpeculation(speculatedTaskConst, controlledClk);
+
TaskAttempt[] ta = makeFirstAttemptWin(appEventHandler, speculatedTask);
- verifySpeculationMessage(app, ta);
- app.waitForState(Service.STATE.STOPPED);
+ waitForTAState(ta[0], TaskAttemptState.SUCCEEDED, controlledClk);
+ waitForAppStop(app, controlledClk);
}
private static TaskAttempt[] makeFirstAttemptWin(
EventHandler appEventHandler, Task speculatedTask) {
-
// finish 1st TA, 2nd will be killed
Collection<TaskAttempt> attempts = speculatedTask.getAttempts().values();
TaskAttempt[] ta = new TaskAttempt[attempts.size()];
attempts.toArray(ta);
- appEventHandler.handle(
- new TaskAttemptEvent(ta[0].getID(), TaskAttemptEventType.TA_DONE));
- appEventHandler.handle(new TaskAttemptEvent(ta[0].getID(),
- TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+ markTACompleted(appEventHandler, ta[0]);
return ta;
}
- private static void verifySpeculationMessage(MRApp app, TaskAttempt[] ta)
- throws Exception {
- app.waitForState(ta[0], TaskAttemptState.SUCCEEDED);
- // The speculative attempt may be not killed before the MR job succeeds.
+ private static void markTACompleted(
+ EventHandler appEventHandler, TaskAttempt attempt) {
+ appEventHandler.handle(
+ new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_DONE));
+ appEventHandler.handle(new TaskAttemptEvent(attempt.getID(),
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED));
}
private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId id,
@@ -387,6 +272,69 @@ public class TestSpeculativeExecutionWithMRApp {
MRJobConfig.MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_LAMBDA_MS,
1000L * 10);
}
+ conf.setLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_NO_SPECULATE,
+ 3000L);
return conf;
}
+
+ /**
+ * Wait for MRapp to stop while incrementing the controlled clock.
+ * @param app the MRApp to be stopped.
+ * @param cClock the controlled clock of the test.
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ private void waitForAppStop(final MRApp app, final ControlledClock cClock)
+ throws TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(() -> {
+ if (app.getServiceState() != Service.STATE.STOPPED) {
+ cClock.tickMsec(250L);
+ return false;
+ }
+ return true;
+ }, 250, 60000);
+ }
+
+ /**
+ * Wait for the task to trigger a new speculation.
+ * @param speculatedTask the task we are monitoring.
+ * @param cClock the controlled clock of the test.
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ private void waitForSpeculation(final Task speculatedTask,
+ final ControlledClock cClock)
+ throws TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(() -> {
+ if (speculatedTask.getAttempts().size() != 2) {
+ cClock.tickMsec(250L);
+ return false;
+ }
+ return true;
+ }, 250, 60000);
+ }
+
+ public void waitForTAState(TaskAttempt attempt,
+ TaskAttemptState finalState, final ControlledClock cClock)
+ throws Exception {
+ GenericTestUtils.waitFor(() -> {
+ if (attempt.getReport().getTaskAttemptState() != finalState) {
+ cClock.tickMsec(250L);
+ return false;
+ }
+ return true;
+ }, 250, 10000);
+ }
+
+ private void updateTaskProgress(EventHandler appEventHandler,
+ TaskAttempt attempt, float newProgress) {
+ TaskAttemptStatus status =
+ createTaskAttemptStatus(attempt.getID(), newProgress,
+ TaskAttemptState.RUNNING);
+ TaskAttemptStatusUpdateEvent event =
+ new TaskAttemptStatusUpdateEvent(attempt.getID(),
+ new AtomicReference<>(status));
+ appEventHandler.handle(event);
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org