You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/11/10 00:32:20 UTC

incubator-gobblin git commit: [GOBBLIN-310] Skip rerunning completed tasks on mapper reattempts

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 1b7748a68 -> f957934a1


[GOBBLIN-310] Skip rerunning completed tasks on mapper reattempts

Closes #2165 from htran1/multitask_commit_fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f957934a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f957934a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f957934a

Branch: refs/heads/master
Commit: f957934a179a773f99facc371efe6ebb15c0739a
Parents: 1b7748a
Author: Hung Tran <hu...@linkedin.com>
Authored: Thu Nov 9 16:32:12 2017 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Nov 9 16:32:12 2017 -0800

----------------------------------------------------------------------
 .../runtime/GobblinMultiTaskAttempt.java        | 44 +++++++++++++-
 .../gobblin/runtime/JobLauncherTestHelper.java  | 39 +++++++++++++
 .../runtime/mapreduce/MRJobLauncherTest.java    | 60 ++++++++++++++++++++
 3 files changed, 142 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f957934a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index 583440e..aa42121 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -78,6 +78,7 @@ public class GobblinMultiTaskAttempt {
     CUSTOMIZED
   }
 
+  private static final String TASK_STATE_STORE_SUCCESS_MARKER_SUFFIX = ".suc";
   private final Logger log;
   private final Iterator<WorkUnit> workUnits;
   private final String jobId;
@@ -240,6 +241,14 @@ public class GobblinMultiTaskAttempt {
           log.error(String.format("Task %s failed due to exception: %s", task.getTaskId(),
               task.getTaskState().getProp(ConfigurationKeys.TASK_FAILURE_EXCEPTION_KEY)));
         }
+
+        // If there are task failures then the tasks may be reattempted. Save a copy of the task state that is used
+        // to filter out successful tasks on subsequent attempts.
+        if (task.getTaskState().getWorkingState() == WorkUnitState.WorkingState.SUCCESSFUL ||
+            task.getTaskState().getWorkingState() == WorkUnitState.WorkingState.COMMITTED) {
+          taskStateStore.put(task.getJobId(), task.getTaskId() + TASK_STATE_STORE_SUCCESS_MARKER_SUFFIX,
+              task.getTaskState());
+        }
       }
 
       throw new IOException(
@@ -272,6 +281,33 @@ public class GobblinMultiTaskAttempt {
   }
 
   /**
+   * Determine if the task executed successfully in a prior attempt by checkitn the task state store for the success
+   * marker.
+   * @param taskId task id to check
+   * @return whether the task was processed successfully in a prior attempt
+   */
+  private boolean taskSuccessfulInPriorAttempt(String taskId) {
+    if (this.taskStateStoreOptional.isPresent()) {
+      StateStore<TaskState> taskStateStore = this.taskStateStoreOptional.get();
+      // Delete the task state file for the task if it already exists.
+      // This usually happens if the task is retried upon failure.
+      try {
+        if (taskStateStore.exists(jobId, taskId + TASK_STATE_STORE_SUCCESS_MARKER_SUFFIX)) {
+          log.info("Skipping task {} that successfully executed in a prior attempt.", taskId);
+
+          // skip tasks that executed successfully in a previous attempt
+          return true;
+        }
+      } catch (IOException e) {
+        // if an error while looking up the task state store then treat like it was not processed
+        return false;
+      }
+    }
+
+    return false;
+  }
+
+  /**
    * Run a given list of {@link WorkUnit}s of a job.
    *
    * <p>
@@ -287,8 +323,14 @@ public class GobblinMultiTaskAttempt {
     List<Task> tasks = Lists.newArrayList();
     while (this.workUnits.hasNext()) {
       WorkUnit workUnit = this.workUnits.next();
-      countDownLatch.countUp();
       String taskId = workUnit.getProp(ConfigurationKeys.TASK_ID_KEY);
+
+      // skip tasks that executed successsfully in a prior attempt
+      if (taskSuccessfulInPriorAttempt(taskId)) {
+        continue;
+      }
+
+      countDownLatch.countUp();
       SubscopedBrokerBuilder<GobblinScopeTypes, ?> taskBrokerBuilder =
           this.jobBroker.newSubscopedBuilder(new TaskScopeInstance(taskId));
       WorkUnitState workUnitState = new WorkUnitState(workUnit, this.jobState, taskBrokerBuilder);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f957934a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
index 5a3c631..497fd88 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
@@ -222,6 +222,45 @@ public class JobLauncherTestHelper {
     }
   }
 
+  /**
+   * Test when a test with the matching suffix is skipped.
+   * @param jobProps job properties
+   * @param skippedTaskSuffix the suffix for the task that is skipped
+   */
+  public void runTestWithSkippedTask(Properties jobProps, String skippedTaskSuffix) throws Exception {
+    String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
+    String jobId = JobLauncherUtils.newJobId(jobName).toString();
+    jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, jobId);
+    jobProps.setProperty(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL, Boolean.FALSE.toString());
+    jobProps.setProperty(ConfigurationKeys.JOB_COMMIT_POLICY_KEY, "successful");
+    jobProps.setProperty(ConfigurationKeys.MAX_TASK_RETRIES_KEY, "0");
+
+    Closer closer = Closer.create();
+    try {
+      JobLauncher jobLauncher = closer.register(JobLauncherFactory.newJobLauncher(this.launcherProps, jobProps));
+      jobLauncher.launchJob(null);
+    } finally {
+      closer.close();
+    }
+
+    List<JobState.DatasetState> datasetStateList =
+        this.datasetStateStore.getAll(jobName, sanitizeJobNameForDatasetStore(jobId) + ".jst");
+    JobState jobState = datasetStateList.get(0);
+
+    Assert.assertEquals(jobState.getState(), JobState.RunningState.COMMITTED);
+    // one task is skipped out of 4
+    Assert.assertEquals(jobState.getCompletedTasks(), 3);
+    for (TaskState taskState : jobState.getTaskStates()) {
+      if (taskState.getTaskId().endsWith(skippedTaskSuffix)) {
+        Assert.assertEquals(taskState.getWorkingState(), WorkUnitState.WorkingState.PENDING);
+      } else {
+        Assert.assertEquals(taskState.getWorkingState(), WorkUnitState.WorkingState.COMMITTED);
+        Assert.assertEquals(taskState.getPropAsLong(ConfigurationKeys.WRITER_RECORDS_WRITTEN),
+            TestExtractor.TOTAL_RECORDS);
+      }
+    }
+  }
+
   public void runTestWithCommitSuccessfulTasksPolicy(Properties jobProps) throws Exception {
     String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
     String jobId = JobLauncherUtils.newJobId(jobName).toString();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f957934a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
index 1792468..60e71a7 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
@@ -25,6 +25,7 @@ import java.util.Properties;
 import org.apache.commons.io.FileUtils;
 import org.jboss.byteman.contrib.bmunit.BMNGRunner;
 import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -33,6 +34,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.metastore.FsStateStore;
 import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
@@ -231,6 +233,64 @@ public class MRJobLauncherTest extends BMNGRunner {
     }
   }
 
+  // This test uses byteman to check that the ".suc" files are recorded in the task state store for successful
+  // tasks when there are some task failures.
+  // static variable to count the number of task success marker files written in this test case
+  public static int sucCount1 = 0;
+  @Test
+  @BMRules(rules = {
+      @BMRule(name = "saveSuccessCount", targetClass = "org.apache.gobblin.metastore.FsStateStore",
+          targetMethod = "put", targetLocation = "AT ENTRY", condition = "$2.endsWith(\".suc\")",
+          action = "org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.sucCount1 = org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.sucCount1 + 1")
+  })
+  public void testLaunchJobWithMultiWorkUnitAndFaultyExtractor() throws Exception {
+    Properties jobProps = loadJobProps();
+    jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY,
+        jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + "-testLaunchJobWithMultiWorkUnitAndFaultyExtractor");
+    jobProps.setProperty("use.multiworkunit", Boolean.toString(true));
+    try {
+      this.jobLauncherTestHelper.runTestWithCommitSuccessfulTasksPolicy(jobProps);
+
+      // three of the 4 tasks should have succeeded, so 3 suc files should have been written
+      Assert.assertEquals(sucCount1, 3);
+    } finally {
+      this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+    }
+  }
+
+  // This test case checks that if a ".suc" task state file exists for a task then it is skipped.
+  // This test also checks that ".suc" files are not written when there are no task failures.
+  // static variables accessed by byteman in this test case
+  public static WorkUnitState wus = null;
+  public static int sucCount2 = 0;
+  @Test
+  @BMRules(rules = {
+      @BMRule(name = "getWorkUnitState", targetClass = "org.apache.gobblin.runtime.GobblinMultiTaskAttempt",
+          targetMethod = "runWorkUnits", targetLocation = "AFTER WRITE $taskId", condition = "$taskId.endsWith(\"_1\")",
+          action = "org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.wus = new org.apache.gobblin.configuration.WorkUnitState($workUnit, $0.jobState)"),
+      @BMRule(name = "saveSuccessCount", targetClass = "org.apache.gobblin.metastore.FsStateStore",
+          targetMethod = "put", targetLocation = "AT ENTRY", condition = "$2.endsWith(\".suc\")",
+          action = "org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.sucCount2 = org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.sucCount2 + 1"),
+      @BMRule(name = "writeSuccessFile", targetClass = "org.apache.gobblin.runtime.GobblinMultiTaskAttempt",
+          targetMethod = "taskSuccessfulInPriorAttempt", targetLocation = "AFTER WRITE $taskStateStore",
+          condition = "$1.endsWith(\"_1\")",
+          action = "$taskStateStore.put($0.jobId, $1 + \".suc\", new org.apache.gobblin.runtime.TaskState(org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.wus))")
+  })
+  public void testLaunchJobWithMultiWorkUnitAndSucFile() throws Exception {
+    Properties jobProps = loadJobProps();
+    jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY,
+        jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + "-testLaunchJobWithMultiWorkUnitAndSucFile");
+    jobProps.setProperty("use.multiworkunit", Boolean.toString(true));
+    try {
+      this.jobLauncherTestHelper.runTestWithSkippedTask(jobProps, "_1");
+
+      // no failures, so the only success file written is the injected one
+      Assert.assertEquals(sucCount2, 1);
+    } finally {
+      this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+    }
+  }
+
   @Test
   public void testLaunchJobWithMultipleDatasets() throws Exception {
     Properties jobProps = loadJobProps();