You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/06/02 22:57:30 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1078][RETRY TASK INITIALIZATION] Adding condition to ensure cancellation happened after run

This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 469a56d  [GOBBLIN-1078][RETRY TASK INITIALIZATION] Adding condition to ensure cancellation happened after run
469a56d is described below

commit 469a56d9161c404c6dc73e8fe927fe2bb8922ddd
Author: autumnust <le...@linkedin.com>
AuthorDate: Tue Jun 2 15:56:59 2020 -0700

    [GOBBLIN-1078][RETRY TASK INITIALIZATION] Adding condition to ensure cancellation happened after run
    
    Add unit test for normal sequence
    
    Fix cancellation unit test with increased sleeping
    time as temp solution
    
    Add missing constant variables after rebasing
    
    Bring back the piece that is lost from rebasing
    for fixing travis
    
    Address reviewer's comments
    
    Update gitignore files for generated stuff
    
    Closes #2919 from autumnust/taskCancelLock
---
 .gitignore                                         |   4 +
 .../apache/gobblin/cluster/GobblinHelixTask.java   |  17 +--
 .../org/apache/gobblin/cluster/SingleTask.java     |  59 ++++++++--
 .../org/apache/gobblin/cluster/SleepingTask.java   |   3 +-
 .../gobblin/cluster/ClusterIntegrationTest.java    |  53 ++++++---
 .../org/apache/gobblin/cluster/TestSingleTask.java | 129 ++++++++++++++++++++-
 .../cluster/suite/IntegrationJobCancelSuite.java   |   9 ++
 7 files changed, 236 insertions(+), 38 deletions(-)

diff --git a/.gitignore b/.gitignore
index d07dada..4135edf 100644
--- a/.gitignore
+++ b/.gitignore
@@ -80,3 +80,7 @@ FsDatasetStateStoreTest/
 GobblinHelixTaskTest/
 commit-sequence-store-test/
 gobblin-test-harness/src/test/resources/runtime_test/state_store/
+gobblin-integration-test-work-dir/
+
+gobblin-test-utils/src/main/gen-avro/
+gobblin-test-utils/src/main/gen-proto/
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index 21e420d..50dc5b3 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -19,9 +19,13 @@ package org.apache.gobblin.cluster;
 
 import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.util.StateStores;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.Id;
 import org.apache.gobblin.util.retry.RetryerFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -41,14 +45,6 @@ import com.typesafe.config.ConfigValueFactory;
 
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.TaskState;
-import org.apache.gobblin.runtime.util.StateStores;
-import org.apache.gobblin.source.workunit.MultiWorkUnit;
-import org.apache.gobblin.source.workunit.WorkUnit;
-import org.apache.gobblin.util.Id;
-
 
 /**
  * An implementation of Helix's {@link org.apache.helix.task.Task} that wraps and runs one or more Gobblin
@@ -65,7 +61,6 @@ import org.apache.gobblin.util.Id;
  *   a file that will be collected by the {@link GobblinHelixJobLauncher} later upon completion of the job.
  * </p>
  */
-@Alpha
 @Slf4j
 public class GobblinHelixTask implements Task {
 
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
index 9dd0a1c..f3f8875 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
@@ -20,12 +20,17 @@ package org.apache.gobblin.cluster;
 import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -40,6 +45,7 @@ import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.util.StateStores;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.JobLauncherUtils;
 import org.apache.gobblin.util.SerializationUtils;
 
@@ -51,7 +57,11 @@ public class SingleTask {
 
   private static final Logger _logger = LoggerFactory.getLogger(SingleTask.class);
 
-  private GobblinMultiTaskAttempt _taskAttempt;
+  public static final String MAX_RETRY_WAITING_FOR_INIT_KEY = "maxRetryBlockedOnTaskAttemptInit";
+  public static final int DEFAULT_MAX_RETRY_WAITING_FOR_INIT = 2;
+
+  @VisibleForTesting
+  GobblinMultiTaskAttempt _taskAttempt;
   private String _jobId;
   private Path _workUnitFilePath;
   private Path _jobStateFilePath;
@@ -61,6 +71,12 @@ public class SingleTask {
   private Config _dynamicConfig;
   private JobState _jobState;
 
+  // Preventing Helix calling cancel before taskAttempt is created
+  // Checking if taskAttempt is empty is not enough, since canceller runs in different thread as runner, the case to
+  // to avoid here is taskAttempt being created and start to run after cancel has been called.
+  private Condition _taskAttemptBuilt;
+  private Lock _lock;
+
   /**
    * Do all heavy-lifting of initialization in constructor which could be retried if failed,
    * see the example in {@link GobblinHelixTask}.
@@ -75,11 +91,12 @@ public class SingleTask {
     _stateStores = stateStores;
     _dynamicConfig = dynamicConfig;
     _jobState = getJobState();
+    _lock = new ReentrantLock();
+    _taskAttemptBuilt = _lock.newCondition();
   }
 
   public void run()
       throws IOException, InterruptedException {
-    List<WorkUnit> workUnits = getWorkUnits();
 
     // Add dynamic configuration to the job state
     _dynamicConfig.entrySet().forEach(e -> _jobState.setProp(e.getKey(), e.getValue().unwrapped().toString()));
@@ -93,7 +110,18 @@ public class SingleTask {
         .createDefaultTopLevelBroker(jobConfig, GobblinScopeTypes.GLOBAL.defaultScopeInstance())) {
       SharedResourcesBroker<GobblinScopeTypes> jobBroker = getJobBroker(_jobState, globalBroker);
 
-      _taskAttempt = _taskAttemptBuilder.build(workUnits.iterator(), _jobId, _jobState, jobBroker);
+      // Secure atomicity of taskAttempt's execution.
+      // Signaling blocking threads if any whenever taskAttempt is nonNull.
+      _taskAttempt = _taskAttemptBuilder.build(getWorkUnits().iterator(), _jobId, _jobState, jobBroker);
+
+      _lock.lock();
+      try {
+        _taskAttemptBuilt.signal();
+      } finally {
+        _lock.unlock();
+      }
+
+      // This is a blocking call.
       _taskAttempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
 
     } finally {
@@ -164,16 +192,31 @@ public class SingleTask {
   }
 
   public void cancel() {
-    if (_taskAttempt != null) {
+    int retryCount = 0 ;
+    int maxRetry = ConfigUtils.getInt(_dynamicConfig, MAX_RETRY_WAITING_FOR_INIT_KEY, DEFAULT_MAX_RETRY_WAITING_FOR_INIT);
+
+    try {
+      _lock.lock();
       try {
+        while (_taskAttempt == null) {
+          // await return false if timeout on this around
+          if (!_taskAttemptBuilt.await(5, TimeUnit.SECONDS) && ++retryCount > maxRetry) {
+            throw new IllegalStateException("Failed to initialize taskAttempt object before cancel");
+          }
+        }
+      } finally {
+        _lock.unlock();
+      }
+
+      if (_taskAttempt != null) {
         _logger.info("Task cancelled: Shutdown starting for tasks with jobId: {}", _jobId);
         _taskAttempt.shutdownTasks();
         _logger.info("Task cancelled: Shutdown complete for tasks with jobId: {}", _jobId);
-      } catch (InterruptedException e) {
-        throw new RuntimeException("Interrupted while shutting down task with jobId: " + _jobId, e);
+      } else {
+        throw new IllegalStateException("TaskAttempt not initialized while passing conditional barrier");
       }
-    } else {
-      _logger.error("Task cancelled but _taskattempt is null, so ignoring.");
+    } catch (InterruptedException e) {
+      throw new RuntimeException("Interrupted while shutting down task with jobId: " + _jobId, e);
     }
   }
 }
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java
index 8d06b70..95a2b4e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java
@@ -32,6 +32,7 @@ import org.apache.gobblin.runtime.task.BaseAbstractTask;
 @Slf4j
 public class SleepingTask extends BaseAbstractTask {
   public static final String TASK_STATE_FILE_KEY = "task.state.file.path";
+  public static final String SLEEP_TIME_IN_SECONDS = "data.publisher.sleep.time.in.seconds";
 
   private final long sleepTime;
   private File taskStateFile;
@@ -39,7 +40,7 @@ public class SleepingTask extends BaseAbstractTask {
   public SleepingTask(TaskContext taskContext) {
     super(taskContext);
     TaskState taskState = taskContext.getTaskState();
-    sleepTime = taskState.getPropAsLong("data.publisher.sleep.time.in.seconds", 10L);
+    sleepTime = taskState.getPropAsLong(SLEEP_TIME_IN_SECONDS, 10L);
     taskStateFile = new File(taskState.getProp(TASK_STATE_FILE_KEY));
     try {
       if (taskStateFile.exists()) {
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
index 5d214a2..e34c0f2 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
@@ -21,6 +21,9 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
 
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
@@ -38,6 +41,7 @@ import org.testng.annotations.Test;
 
 import com.google.common.base.Predicate;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -80,28 +84,49 @@ public class ClusterIntegrationTest {
 
   @Test
   void testJobShouldGetCancelled() throws Exception {
+    // Cancellation usually needs long time to successfully be executed, therefore setting the sleeping time to 100.
     Config jobConfigOverrides = ClusterIntegrationTestUtils.buildSleepingJob(IntegrationJobCancelSuite.JOB_ID,
-        IntegrationJobCancelSuite.TASK_STATE_FILE);
-    this.suite =new IntegrationJobCancelSuite(jobConfigOverrides);
+        IntegrationJobCancelSuite.TASK_STATE_FILE)
+        .withValue(SleepingTask.SLEEP_TIME_IN_SECONDS, ConfigValueFactory.fromAnyRef(100));
+    this.suite = new IntegrationJobCancelSuite(jobConfigOverrides);
     HelixManager helixManager = getHelixManager();
     suite.startCluster();
     helixManager.connect();
 
-    TaskDriver taskDriver = new TaskDriver(helixManager);
-
-    //Ensure that Helix has created a workflow
-    AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
-        assertTrue(isTaskStarted(helixManager, IntegrationJobCancelSuite.JOB_ID), "Waiting for the job to start...");
-
-    //Ensure that the SleepingTask is running
-    AssertWithBackoff.create().maxSleepMs(100).timeoutMs(2000).backoffFactor(1).
-        assertTrue(isTaskRunning(IntegrationJobCancelSuite.TASK_STATE_FILE),"Waiting for the task to enter running state");
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    Runnable cancelAfterTaskInit = () -> {
+      try {
+        TaskDriver taskDriver = new TaskDriver(helixManager);
+        // The actual cancellation needs to be executed in separated thread to make the cancel of helix is not blocked by
+        // SleepingTask's thread in its own thread.
+        // Issue the cancel after ensuring the workflow is created and the SleepingTask is running
+        AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
+            assertTrue(isTaskStarted(helixManager, IntegrationJobCancelSuite.JOB_ID), "Waiting for the job to start...");
+
+        AssertWithBackoff.create().maxSleepMs(100).timeoutMs(2000).backoffFactor(1).
+            assertTrue(isTaskRunning(IntegrationJobCancelSuite.TASK_STATE_FILE),
+                "Waiting for the task to enter running state");
+
+        log.info("Stopping the job");
+        taskDriver.stop(IntegrationJobCancelSuite.JOB_ID);
+        suite.shutdownCluster();
+      } catch (Exception e) {
+        throw new RuntimeException("Failure in canceling tasks");
+      }
+    };
 
-    log.info("Stopping the job");
-    taskDriver.stop(IntegrationJobCancelSuite.JOB_ID);
+    FutureTask<String> futureTask = new FutureTask<String>( cancelAfterTaskInit, "cancelled");
+    executor.submit(futureTask);
 
-    suite.shutdownCluster();
+    AssertWithBackoff assertWithBackoff = AssertWithBackoff.create().backoffFactor(1).maxSleepMs(1000).timeoutMs(500000);
+    assertWithBackoff.assertTrue(new Predicate<Void>() {
+      @Override
+      public boolean apply(Void input) {
+        return futureTask.isDone();
+      }
+    }, "waiting for future to complete");
 
+    Assert.assertEquals(futureTask.get(), "cancelled");
     suite.waitForAndVerifyOutputFiles();
   }
 
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java
index 04a800c..06b0232 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java
@@ -21,17 +21,26 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
 
+import org.apache.gobblin.testing.AssertWithBackoff;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.FileUtils;
 import org.junit.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.io.Files;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.FileUtils;
+import javax.annotation.Nullable;
+
+import static org.apache.gobblin.cluster.SingleTask.MAX_RETRY_WAITING_FOR_INIT_KEY;
 
 
 /**
@@ -64,13 +73,125 @@ public class TestSingleTask {
    * re-run it again.
    */
   @Test
-  public void testSingleTaskRerunAfterFailure() throws Exception {
-    InMemorySingleTaskRunner inMemorySingleTaskRunner = createInMemoryTaskRunner();
+  public void testSingleTaskRerunAfterFailure()
+      throws Exception {
+    SingleTaskRunner inMemorySingleTaskRunner = createInMemoryTaskRunner();
     try {
       inMemorySingleTaskRunner.run(true);
     } catch (Exception e) {
       inMemorySingleTaskRunner.run();
     }
+
     Assert.assertTrue(true);
   }
+
+  @Test
+  public void testTaskCancelBeforeRunFailure() throws Exception {
+    InMemorySingleTaskRunner inMemorySingleTaskRunner = createInMemoryTaskRunner();
+    inMemorySingleTaskRunner.initClusterSingleTask(false);
+
+    // Directly calling cancel without initializing taskAttempt, it will timeout until reaching illegal state defined
+    // in SingleTask.
+    try {
+      inMemorySingleTaskRunner.task.cancel();
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof IllegalStateException);
+      Assert.assertTrue(e.getMessage().contains("Failed to initialize"));
+    }
+  }
+
+  // Normal sequence means, run is executed before cancel method.
+  @Test
+  public void testNormalSequence() throws Exception {
+    InMemorySingleTaskRunner inMemorySingleTaskRunner = createInMemoryTaskRunner();
+
+    inMemorySingleTaskRunner.startServices();
+    inMemorySingleTaskRunner.initClusterSingleTask(false);
+    final SingleTask task = inMemorySingleTaskRunner.task;
+    ExecutorService executor = Executors.newFixedThreadPool(2);
+
+    Runnable cancelRunnable = () -> {
+      try {
+        task.cancel();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+    final FutureTask<String> cancelTask = new FutureTask<String>(cancelRunnable, "cancelled");
+
+    Runnable runRunnable = () -> {
+      try {
+        task.run();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+
+    FutureTask<String> runTask = new FutureTask<String>(runRunnable, "completed");
+    executor.submit(runTask);
+    AssertWithBackoff.create().timeoutMs(2000).backoffFactor(1).assertTrue(new Predicate<Void>() {
+                                                                             @Override
+                                                                             public boolean apply(@Nullable Void input) {
+                                                                               return task._taskAttempt != null;
+                                                                             }
+                                                                           }, "wait until task attempt available");
+
+    // Simulate the process that signal happened first.
+    executor.submit(cancelTask);
+
+    AssertWithBackoff.create().timeoutMs(2000).backoffFactor(1).assertTrue(new Predicate<Void>() {
+      @Override
+      public boolean apply(@Nullable Void input) {
+        return cancelTask.isDone();
+      }
+    }, "wait until task attempt available");
+    Assert.assertEquals(cancelTask.get(), "cancelled");
+  }
+
+  @Test
+  public void testTaskCancelBeforeRun()
+      throws Exception {
+    final InMemorySingleTaskRunner inMemorySingleTaskRunner = createInMemoryTaskRunner();
+
+    // Place cancellation into infinite wait while having another thread initialize the taskAttempt.
+    // Reset task and set the retry to be infinite large.
+    inMemorySingleTaskRunner
+        .setInjectedConfig(ConfigFactory.parseMap(ImmutableMap.of(MAX_RETRY_WAITING_FOR_INIT_KEY, Integer.MAX_VALUE)));
+    inMemorySingleTaskRunner.startServices();
+    inMemorySingleTaskRunner.initClusterSingleTask(false);
+    final SingleTask task = inMemorySingleTaskRunner.task;
+    ExecutorService executor = Executors.newFixedThreadPool(2);
+
+    Runnable cancelRunnable = () -> {
+      try {
+        task.cancel();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+    FutureTask<String> cancelTask = new FutureTask<String>(cancelRunnable, "cancelled");
+    executor.submit(cancelTask);
+
+    Runnable runRunnable = () -> {
+      try {
+        task.run();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+
+    FutureTask<String> runTask = new FutureTask<String>(runRunnable, "completed");
+    executor.submit(runTask);
+
+    AssertWithBackoff assertWithBackoff = AssertWithBackoff.create().backoffFactor(1).maxSleepMs(1000).timeoutMs(500000);
+    assertWithBackoff.assertTrue(new Predicate<Void>() {
+      @Override
+      public boolean apply(@Nullable Void input) {
+        return runTask.isDone();
+      }
+    }, "waiting for future to complete");
+    Assert.assertEquals(runTask.get(), "completed");
+    Assert.assertTrue(cancelTask.isDone());
+    Assert.assertEquals(cancelTask.get(), "cancelled");
+  }
 }
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
index b16febb..6bb3502 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
@@ -25,6 +25,15 @@ import com.typesafe.config.Config;
 public class IntegrationJobCancelSuite extends IntegrationBasicSuite {
   public static final String JOB_ID = "job_HelloWorldTestJob_1234";
   public static final String TASK_STATE_FILE = "/tmp/IntegrationJobCancelSuite/taskState/_RUNNING";
+  private int sleepingTime = 10;
+
+  public IntegrationJobCancelSuite() {
+    // for backward compatible.
+  }
+
+  public IntegrationJobCancelSuite(int sleepingTime) {
+    this.sleepingTime = sleepingTime;
+  }
 
   public IntegrationJobCancelSuite(Config jobConfigOverrides) {
     super(jobConfigOverrides);