You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/06 00:08:14 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-940] Add synchronization on workunit persistency before Helix job launching

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new bf33b58  [GOBBLIN-940] Add synchronization on workunit persistency before Helix job launching
bf33b58 is described below

commit bf33b58d450d1f35e5f5cc147159710815210cd0
Author: autumnust <le...@linkedin.com>
AuthorDate: Tue Nov 5 16:08:04 2019 -0800

    [GOBBLIN-940] Add synchronization on workunit persistency before Helix job launching
    
    Closes #2789 from
    autumnust/synchronizedBeforeHelixSubmission
---
 .../gobblin/cluster/GobblinHelixJobLauncher.java   | 29 +++++++----
 gobblin-utility/build.gradle                       | 19 +++-----
 .../org/apache/gobblin/util/ParallelRunner.java    | 20 +++++++-
 .../apache/gobblin/util/ParallelRunnerTest.java    | 57 +++++++++++++++-------
 4 files changed, 85 insertions(+), 40 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index e820c1f..975037c 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -219,7 +219,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
           if (helixMetrics.isPresent()) {
             helixMetrics.get().submitMeter.mark();
           }
-          submitJobToHelix(createJob(workUnits));
+          submitJobToHelix(createHelixJob(workUnits));
           if (helixMetrics.isPresent()) {
             this.helixMetrics.get().updateTimeForHelixSubmit(submitStart);
           }
@@ -271,7 +271,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
   /**
    * Create a job from a given batch of {@link WorkUnit}s.
    */
-  private JobConfig.Builder createJob(List<WorkUnit> workUnits) throws IOException {
+  JobConfig.Builder createHelixJob(List<WorkUnit> workUnits) throws IOException {
     Map<String, TaskConfig> taskConfigMap = Maps.newHashMap();
 
     try (ParallelRunner stateSerDeRunner = new ParallelRunner(this.stateSerDeRunnerThreads, this.fs)) {
@@ -295,23 +295,34 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
         SerializationUtils.serializeState(this.fs, jobStateFilePath, this.jobContext.getJobState());
       }
 
-      LOGGER.debug("GobblinHelixJobLauncher.createJob: jobStateFilePath {}, jobState {} jobProperties {}",
+      // Block on persistence of all workunits to be finished.
+      // It is necessary when underlying storage being slow and Helix activate task-execution before the workunit being persisted.
+      stateSerDeRunner.waitForTasks();
+
+      LOGGER.debug("GobblinHelixJobLauncher.createHelixJob: jobStateFilePath {}, jobState {} jobProperties {}",
           jobStateFilePath, this.jobContext.getJobState().toString(), this.jobContext.getJobState().getProperties());
+
+      return translateGobblinJobConfigToHelixJobConfig(this.jobContext.getJobState(), workUnits, taskConfigMap);
     }
+  }
 
+  /**
+   * Populate {@link JobConfig.Builder} with relevant gobblin job-configurations.
+   */
+  JobConfig.Builder translateGobblinJobConfigToHelixJobConfig(JobState gobblinJobState, List<WorkUnit> workUnits,
+      Map<String, TaskConfig> taskConfigMap) {
     JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
 
     // Helix task attempts = retries + 1 (fallback to general task retry for backward compatibility)
-    jobConfigBuilder.setMaxAttemptsPerTask(this.jobContext.getJobState().getPropAsInt(
-        GobblinClusterConfigurationKeys.HELIX_TASK_MAX_ATTEMPTS_KEY,
-        this.jobContext.getJobState().getPropAsInt(
+    jobConfigBuilder.setMaxAttemptsPerTask(gobblinJobState.getPropAsInt(
+        GobblinClusterConfigurationKeys.HELIX_TASK_MAX_ATTEMPTS_KEY, gobblinJobState.getPropAsInt(
             ConfigurationKeys.MAX_TASK_RETRIES_KEY,
             ConfigurationKeys.DEFAULT_MAX_TASK_RETRIES)) + 1);
 
     // Helix task timeout (fallback to general task timeout for backward compatibility)
-    jobConfigBuilder.setTimeoutPerTask(this.jobContext.getJobState().getPropAsLong(
+    jobConfigBuilder.setTimeoutPerTask(gobblinJobState.getPropAsLong(
         GobblinClusterConfigurationKeys.HELIX_TASK_TIMEOUT_SECONDS,
-        this.jobContext.getJobState().getPropAsLong(
+        gobblinJobState.getPropAsLong(
             ConfigurationKeys.TASK_TIMEOUT_SECONDS,
             ConfigurationKeys.DEFAULT_TASK_TIMEOUT_SECONDS)) * 1000);
 
@@ -337,7 +348,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
       jobConfigBuilder.setRebalanceRunningTask(true);
     }
 
-    jobConfigBuilder.setExpiry(this.jobContext.getJobState().getPropAsLong(
+    jobConfigBuilder.setExpiry(gobblinJobState.getPropAsLong(
         GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS, GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS));
 
     return jobConfigBuilder;
diff --git a/gobblin-utility/build.gradle b/gobblin-utility/build.gradle
index ad49820..485641b 100644
--- a/gobblin-utility/build.gradle
+++ b/gobblin-utility/build.gradle
@@ -59,20 +59,17 @@ dependencies {
 }
 
 configurations {
-    compile {
-        transitive = true
-    }
-    archives
+  compile {
+    transitive = true
+  }
+  archives
 }
 
 test {
-    workingDir rootProject.rootDir
+  workingDir rootProject.rootDir
 }
 
-
-
-ext.classification="library"
-
+ext.classification = "library"
 
 task utilityTar(type: Tar) {
   //there seems to be a bug in the Gradle signing module where X.tar.gz will generate
@@ -83,8 +80,8 @@ task utilityTar(type: Tar) {
   compression = Compression.GZIP
 
   into("lib") { from configurations.runtime }
-  into("lib") { from "${project.rootDir}/build/${project.name}/libs/${project.name}.jar"}
-  into("lib") { from "${project.rootDir}/build/${project.name}/libs/${project.name}-${project.version}.jar"}
+  into("lib") { from "${project.rootDir}/build/${project.name}/libs/${project.name}.jar" }
+  into("lib") { from "${project.rootDir}/build/${project.name}/libs/${project.name}-${project.version}.jar" }
   into("bin") {
     from "src/main/bash"
     fileMode = 0755
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java
index e2f4964..04cc21a 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java
@@ -30,6 +30,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
 
 import org.apache.hadoop.conf.Configuration;
@@ -81,6 +83,8 @@ public class ParallelRunner implements Closeable {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(ParallelRunner.class);
 
+  private static final long DEFAULT_PARALLEL_RUNNER_WAIT_ON_FINISH_TIMEOUT = 10000;
+
   public static final String PARALLEL_RUNNER_THREADS_KEY = "parallel.runner.threads";
   public static final int DEFAULT_PARALLEL_RUNNER_THREADS = 10;
 
@@ -354,7 +358,7 @@ public class ParallelRunner implements Closeable {
    * Wait for all submitted tasks to complete. The {@link ParallelRunner} can be reused after this call.
    * @throws IOException
    */
-  public void waitForTasks() throws IOException {
+  public void waitForTasks(long timeoutInMills) throws IOException {
     // Wait for all submitted tasks to complete
     boolean wasInterrupted = false;
     IOException exception = null;
@@ -363,7 +367,7 @@ public class ParallelRunner implements Closeable {
         if (wasInterrupted) {
           future.getFuture().cancel(true);
         } else {
-          future.getFuture().get();
+          future.getFuture().get(timeoutInMills, TimeUnit.MILLISECONDS);
         }
       } catch (InterruptedException ie) {
         LOGGER.warn("Task was interrupted: " + future.getName());
@@ -376,6 +380,11 @@ public class ParallelRunner implements Closeable {
         if (exception == null) {
           exception = new IOException(ee.getCause());
         }
+      } catch (TimeoutException te) {
+        LOGGER.warn("Tasks not fully finished before Parallel runner waiting until timeout due to:", te);
+        if (exception == null) {
+          exception = new IOException(te.getCause());
+        }
       }
     }
     if (wasInterrupted) {
@@ -389,6 +398,13 @@ public class ParallelRunner implements Closeable {
     futures.clear();
   }
 
+  /**
+   * Wait until default timeout reached for all tasks under this parallel runner.
+   */
+  public void waitForTasks() throws IOException{
+    this.waitForTasks(DEFAULT_PARALLEL_RUNNER_WAIT_ON_FINISH_TIMEOUT);
+  }
+
   @Override
   public void close() throws IOException {
     try {
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/ParallelRunnerTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/ParallelRunnerTest.java
index d2cd06e..6e11944 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/ParallelRunnerTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/ParallelRunnerTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.gobblin.testing.AssertWithBackoff;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -191,9 +192,18 @@ public class ParallelRunnerTest {
     Assert.assertEquals(actual.toString(), expected);
   }
 
-  @Test(groups = { "ignore" })
+  /**
+   * A test verifying when {@link ParallelRunner#close()} is called, everything inside runner has been collected.
+   * Positive case: Both files get mock-deleted and parallelRunner returns.
+   * Negative case: Using countdown-latch to specify causality between two deletion of files and construct a deadlock.
+   * The src2 deletion won't happen as it depends on src1's deletion and vice versa.
+   * {@link ParallelRunner#close()} will finally timeout and expect to catch the {@link java.util.concurrent.TimeoutException}.
+   */
   public void testWaitsForFuturesWhenClosing() throws IOException, InterruptedException {
+
+    // Indicate if deadlock is being constructed.
     final AtomicBoolean flag = new AtomicBoolean();
+    flag.set(true);
     final CountDownLatch latch1 = new CountDownLatch(1);
     final CountDownLatch latch2 = new CountDownLatch(1);
     Path src1 = new Path("/src/file1.txt");
@@ -203,23 +213,23 @@ public class ParallelRunnerTest {
     Mockito.when(fs.delete(src1, true)).thenAnswer(new Answer<Boolean>() {
       @Override
       public Boolean answer(InvocationOnMock invocation) throws Throwable {
+        if (flag.get()) {
+          latch2.await();
+        }
         latch1.countDown();
-        return false;
+        return true;
       }
     });
     Mockito.when(fs.exists(src2)).thenReturn(true);
+
+    /** Will make deletion of files from parallelRunner to be timeout after 5 seconds.*/
+    final int timeout = 5000;
     Mockito.when(fs.delete(src2, true)).thenAnswer(new Answer<Boolean>() {
       @Override
       public Boolean answer(InvocationOnMock invocation) throws Throwable {
-        latch1.await();
-        long end = System.currentTimeMillis() + 70000;
-        while (System.currentTimeMillis() < end) {
-          try {
-            Thread.sleep(Math.max(1, end - System.currentTimeMillis()));
-          } catch (Exception ignored) {
-          }
+        if (flag.get()) {
+          latch1.await();
         }
-        flag.set(true);
         latch2.countDown();
         return true;
       }
@@ -228,19 +238,30 @@ public class ParallelRunnerTest {
     boolean caughtException = false;
     ParallelRunner parallelRunner = new ParallelRunner(2, fs);
     try {
-      parallelRunner.deletePath(src1, true);
       parallelRunner.deletePath(src2, true);
-      System.out.println(System.currentTimeMillis() + ": START - ParallelRunner.close()");
-      parallelRunner.close();
+      parallelRunner.deletePath(src1, true);
+      parallelRunner.waitForTasks(timeout);
     } catch (IOException e) {
       caughtException = true;
     }
     Assert.assertTrue(caughtException);
-    System.out.println(System.currentTimeMillis() + ": END   - ParallelRunner.close()");
-    System.out.println(System.currentTimeMillis() + ": Waiting for unkillable task to finish...");
-    latch2.await();
-    System.out.println(System.currentTimeMillis() + ": Unkillable task completed.");
-    Assert.assertTrue(flag.get());
+    Assert.assertEquals(latch2.getCount(), 1);
+    Assert.assertEquals(latch1.getCount(), 1);
+
+    // Remove deadlock
+    flag.set(false);
+    caughtException = false;
+    parallelRunner = new ParallelRunner(2, fs);
+    try {
+      parallelRunner.deletePath(src2, true);
+      parallelRunner.deletePath(src1, true);
+      parallelRunner.waitForTasks(timeout);
+    } catch (IOException e) {
+      caughtException = true;
+    }
+    Assert.assertFalse(caughtException);
+    Assert.assertEquals(latch2.getCount(), 0);
+    Assert.assertEquals(latch1.getCount(), 0);
   }
 
   @AfterClass