You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/11/10 00:32:20 UTC
incubator-gobblin git commit: [GOBBLIN-310] Skip rerunning completed
tasks on mapper reattempts
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 1b7748a68 -> f957934a1
[GOBBLIN-310] Skip rerunning completed tasks on mapper reattempts
Closes #2165 from htran1/multitask_commit_fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f957934a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f957934a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f957934a
Branch: refs/heads/master
Commit: f957934a179a773f99facc371efe6ebb15c0739a
Parents: 1b7748a
Author: Hung Tran <hu...@linkedin.com>
Authored: Thu Nov 9 16:32:12 2017 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Nov 9 16:32:12 2017 -0800
----------------------------------------------------------------------
.../runtime/GobblinMultiTaskAttempt.java | 44 +++++++++++++-
.../gobblin/runtime/JobLauncherTestHelper.java | 39 +++++++++++++
.../runtime/mapreduce/MRJobLauncherTest.java | 60 ++++++++++++++++++++
3 files changed, 142 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f957934a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index 583440e..aa42121 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -78,6 +78,7 @@ public class GobblinMultiTaskAttempt {
CUSTOMIZED
}
+ private static final String TASK_STATE_STORE_SUCCESS_MARKER_SUFFIX = ".suc";
private final Logger log;
private final Iterator<WorkUnit> workUnits;
private final String jobId;
@@ -240,6 +241,14 @@ public class GobblinMultiTaskAttempt {
log.error(String.format("Task %s failed due to exception: %s", task.getTaskId(),
task.getTaskState().getProp(ConfigurationKeys.TASK_FAILURE_EXCEPTION_KEY)));
}
+
+ // If there are task failures then the tasks may be reattempted. Save a copy of the task state that is used
+ // to filter out successful tasks on subsequent attempts.
+ if (task.getTaskState().getWorkingState() == WorkUnitState.WorkingState.SUCCESSFUL ||
+ task.getTaskState().getWorkingState() == WorkUnitState.WorkingState.COMMITTED) {
+ taskStateStore.put(task.getJobId(), task.getTaskId() + TASK_STATE_STORE_SUCCESS_MARKER_SUFFIX,
+ task.getTaskState());
+ }
}
throw new IOException(
@@ -272,6 +281,33 @@ public class GobblinMultiTaskAttempt {
}
/**
+ * Determine if the task executed successfully in a prior attempt by checkitn the task state store for the success
+ * marker.
+ * @param taskId task id to check
+ * @return whether the task was processed successfully in a prior attempt
+ */
+ private boolean taskSuccessfulInPriorAttempt(String taskId) {
+ if (this.taskStateStoreOptional.isPresent()) {
+ StateStore<TaskState> taskStateStore = this.taskStateStoreOptional.get();
+ // Delete the task state file for the task if it already exists.
+ // This usually happens if the task is retried upon failure.
+ try {
+ if (taskStateStore.exists(jobId, taskId + TASK_STATE_STORE_SUCCESS_MARKER_SUFFIX)) {
+ log.info("Skipping task {} that successfully executed in a prior attempt.", taskId);
+
+ // skip tasks that executed successfully in a previous attempt
+ return true;
+ }
+ } catch (IOException e) {
+ // if an error while looking up the task state store then treat like it was not processed
+ return false;
+ }
+ }
+
+ return false;
+ }
+
+ /**
* Run a given list of {@link WorkUnit}s of a job.
*
* <p>
@@ -287,8 +323,14 @@ public class GobblinMultiTaskAttempt {
List<Task> tasks = Lists.newArrayList();
while (this.workUnits.hasNext()) {
WorkUnit workUnit = this.workUnits.next();
- countDownLatch.countUp();
String taskId = workUnit.getProp(ConfigurationKeys.TASK_ID_KEY);
+
+ // skip tasks that executed successsfully in a prior attempt
+ if (taskSuccessfulInPriorAttempt(taskId)) {
+ continue;
+ }
+
+ countDownLatch.countUp();
SubscopedBrokerBuilder<GobblinScopeTypes, ?> taskBrokerBuilder =
this.jobBroker.newSubscopedBuilder(new TaskScopeInstance(taskId));
WorkUnitState workUnitState = new WorkUnitState(workUnit, this.jobState, taskBrokerBuilder);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f957934a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
index 5a3c631..497fd88 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
@@ -222,6 +222,45 @@ public class JobLauncherTestHelper {
}
}
+ /**
+ * Test when a test with the matching suffix is skipped.
+ * @param jobProps job properties
+ * @param skippedTaskSuffix the suffix for the task that is skipped
+ */
+ public void runTestWithSkippedTask(Properties jobProps, String skippedTaskSuffix) throws Exception {
+ String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
+ String jobId = JobLauncherUtils.newJobId(jobName).toString();
+ jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, jobId);
+ jobProps.setProperty(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL, Boolean.FALSE.toString());
+ jobProps.setProperty(ConfigurationKeys.JOB_COMMIT_POLICY_KEY, "successful");
+ jobProps.setProperty(ConfigurationKeys.MAX_TASK_RETRIES_KEY, "0");
+
+ Closer closer = Closer.create();
+ try {
+ JobLauncher jobLauncher = closer.register(JobLauncherFactory.newJobLauncher(this.launcherProps, jobProps));
+ jobLauncher.launchJob(null);
+ } finally {
+ closer.close();
+ }
+
+ List<JobState.DatasetState> datasetStateList =
+ this.datasetStateStore.getAll(jobName, sanitizeJobNameForDatasetStore(jobId) + ".jst");
+ JobState jobState = datasetStateList.get(0);
+
+ Assert.assertEquals(jobState.getState(), JobState.RunningState.COMMITTED);
+ // one task is skipped out of 4
+ Assert.assertEquals(jobState.getCompletedTasks(), 3);
+ for (TaskState taskState : jobState.getTaskStates()) {
+ if (taskState.getTaskId().endsWith(skippedTaskSuffix)) {
+ Assert.assertEquals(taskState.getWorkingState(), WorkUnitState.WorkingState.PENDING);
+ } else {
+ Assert.assertEquals(taskState.getWorkingState(), WorkUnitState.WorkingState.COMMITTED);
+ Assert.assertEquals(taskState.getPropAsLong(ConfigurationKeys.WRITER_RECORDS_WRITTEN),
+ TestExtractor.TOTAL_RECORDS);
+ }
+ }
+ }
+
public void runTestWithCommitSuccessfulTasksPolicy(Properties jobProps) throws Exception {
String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
String jobId = JobLauncherUtils.newJobId(jobName).toString();
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f957934a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
index 1792468..60e71a7 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
@@ -25,6 +25,7 @@ import java.util.Properties;
import org.apache.commons.io.FileUtils;
import org.jboss.byteman.contrib.bmunit.BMNGRunner;
import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -33,6 +34,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metastore.FsStateStore;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
@@ -231,6 +233,64 @@ public class MRJobLauncherTest extends BMNGRunner {
}
}
+ // This test uses byteman to check that the ".suc" files are recorded in the task state store for successful
+ // tasks when there are some task failures.
+ // static variable to count the number of task success marker files written in this test case
+ public static int sucCount1 = 0;
+ @Test
+ @BMRules(rules = {
+ @BMRule(name = "saveSuccessCount", targetClass = "org.apache.gobblin.metastore.FsStateStore",
+ targetMethod = "put", targetLocation = "AT ENTRY", condition = "$2.endsWith(\".suc\")",
+ action = "org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.sucCount1 = org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.sucCount1 + 1")
+ })
+ public void testLaunchJobWithMultiWorkUnitAndFaultyExtractor() throws Exception {
+ Properties jobProps = loadJobProps();
+ jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY,
+ jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + "-testLaunchJobWithMultiWorkUnitAndFaultyExtractor");
+ jobProps.setProperty("use.multiworkunit", Boolean.toString(true));
+ try {
+ this.jobLauncherTestHelper.runTestWithCommitSuccessfulTasksPolicy(jobProps);
+
+ // three of the 4 tasks should have succeeded, so 3 suc files should have been written
+ Assert.assertEquals(sucCount1, 3);
+ } finally {
+ this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ }
+ }
+
+ // This test case checks that if a ".suc" task state file exists for a task then it is skipped.
+ // This test also checks that ".suc" files are not written when there are no task failures.
+ // static variables accessed by byteman in this test case
+ public static WorkUnitState wus = null;
+ public static int sucCount2 = 0;
+ @Test
+ @BMRules(rules = {
+ @BMRule(name = "getWorkUnitState", targetClass = "org.apache.gobblin.runtime.GobblinMultiTaskAttempt",
+ targetMethod = "runWorkUnits", targetLocation = "AFTER WRITE $taskId", condition = "$taskId.endsWith(\"_1\")",
+ action = "org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.wus = new org.apache.gobblin.configuration.WorkUnitState($workUnit, $0.jobState)"),
+ @BMRule(name = "saveSuccessCount", targetClass = "org.apache.gobblin.metastore.FsStateStore",
+ targetMethod = "put", targetLocation = "AT ENTRY", condition = "$2.endsWith(\".suc\")",
+ action = "org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.sucCount2 = org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.sucCount2 + 1"),
+ @BMRule(name = "writeSuccessFile", targetClass = "org.apache.gobblin.runtime.GobblinMultiTaskAttempt",
+ targetMethod = "taskSuccessfulInPriorAttempt", targetLocation = "AFTER WRITE $taskStateStore",
+ condition = "$1.endsWith(\"_1\")",
+ action = "$taskStateStore.put($0.jobId, $1 + \".suc\", new org.apache.gobblin.runtime.TaskState(org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.wus))")
+ })
+ public void testLaunchJobWithMultiWorkUnitAndSucFile() throws Exception {
+ Properties jobProps = loadJobProps();
+ jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY,
+ jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + "-testLaunchJobWithMultiWorkUnitAndSucFile");
+ jobProps.setProperty("use.multiworkunit", Boolean.toString(true));
+ try {
+ this.jobLauncherTestHelper.runTestWithSkippedTask(jobProps, "_1");
+
+ // no failures, so the only success file written is the injected one
+ Assert.assertEquals(sucCount2, 1);
+ } finally {
+ this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ }
+ }
+
@Test
public void testLaunchJobWithMultipleDatasets() throws Exception {
Properties jobProps = loadJobProps();