You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2020/03/04 01:25:15 UTC

[GitHub] [incubator-gobblin] autumnust opened a new pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

autumnust opened a new pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907
 
 
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - https://issues.apache.org/jira/browse/GOBBLIN-1068
   
   
   ### Description
   - [ ] Here are some details about my PR, including screenshots (if applicable):
   The original `cancel` method in `Task.java` has several problems: 
   --`Task` is an implementation of `Runable` while itself contains a reference of `Future<?>` which is the handler of the `Runnable` after being submitted. The life-cycle of the `Runnable` shouldn't be controlled by itself as it looks like right now, but should be the `TaskExecutor` or something externally. 
   -- Because of the existence of a `Future<?>`  object within Task, there's a bug in the implementation: The extension of `Task`, which is `TaskIFaceWrapper` doesn't have the constructor to initialize this variable, while one of the public method(`completeShutdown` within `cancel`) is exposing the accessing of this variable. This is a bug and the original fix in this PR:https://github.com/apache/incubator-gobblin/pull/2157/files is doing wrong thing to copy the base class's method and remove the calling of `completeShutdown`. In fact, the future object doesn't have to be existed in the first place. 
   -- Fixing some other typos, and unit test problems as well. 
   
   ### Tests
   - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   
   
   ### Commits
   - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r404662868
 
 

 ##########
 File path: gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
 ##########
 @@ -167,9 +171,20 @@ public void testJobRestartViaSpec() throws Exception {
       return recordNew == null || targetStateNew.equals(TargetState.STOP.name());
     }, "Waiting for Workflow TargetState to be STOP");
 
-    //Ensure that the SleepingTask did not terminate normally i.e. it was interrupted. We check this by ensuring
-    // that the line "Hello World!" is not present in the logged output.
-    suite.waitForAndVerifyOutputFiles();
+    // Ensure that the SleepingTask did not terminate normally i.e. it was interrupted. We check this by ensuring
+    // that the line "Hello World!" is not present in the logged output but only "Sleeping interrupted"
+    // It is important to have back-off here since ZNode STOP state doesn't mean the actual sleeping task has been terminated
+    // since shutdown of task is async.
+    AssertWithBackoff.create().maxSleepMs(1000).timeoutMs(120000).backoffFactor(1).assertTrue(input -> {
 
 Review comment:
   should be `maxSleepMs(1000L).timeoutMs(120000L)`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r404646726
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java
 ##########
 @@ -19,27 +19,33 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.TimeoutException;
 
+import org.apache.gobblin.runtime.TaskContext;
 
 Review comment:
   It seems you are using wrong codestyle file, which is messing up imports in all of your PRs

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] autumnust commented on issue #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
autumnust commented on issue #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#issuecomment-594799684
 
 
   > @sv2000 Can you please take a look whenever you have time? Thanks.
   
   Looks like there's a code-change-irrelevant travis failure happening which couldn't be reproduced locally. Will trigger another build after addressing comments if any. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r404220767
 
 

 ##########
 File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 ##########
 @@ -169,25 +174,35 @@ public void run()
       }
     } catch (InterruptedException interrupt) {
       log.info("Job interrupted by InterrupedException.");
-      interruptTaskExecution(countDownLatch);
+      interruptTaskExecution(Optional.of(countDownLatch));
     }
     log.info("All assigned tasks of job {} have completed in container {}", jobId, containerIdOptional.or(""));
   }
 
-  private void interruptTaskExecution(CountDownLatch countDownLatch)
-      throws InterruptedException {
+  /**
+   * A helper function that that shuts down all outstanding tasks and
+   * shuts down the taskExecutor if it times out on a task termination.
+   */
+  private void interruptTaskExecution(Optional<CountDownLatch> countDownLatch) throws InterruptedException {
     log.info("Job interrupted. Attempting a graceful shutdown of the job.");
-    this.tasks.forEach(Task::shutdown);
-    if (!countDownLatch.await(5, TimeUnit.SECONDS)) {
-      log.warn("Graceful shutdown of job timed out. Killing all outstanding tasks.");
-      try {
+    this.shutdownTasks();
+    try {
+      if (!countDownLatch.isPresent() || !countDownLatch.get().await(5, TimeUnit.SECONDS)) {
+        log.warn("Shutting down TaskExecutor. Killing all outstanding tasks.");
         this.taskExecutor.shutDown();
-      } catch (Throwable t) {
-        throw new RuntimeException("Failed to shutdown task executor.", t);
       }
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to shutdown task executor.", e);
     }
   }
 
+  /**
+   * Shutting down the whole {@link GobblinMultiTaskAttempt} by shutting down all its outstanding tasks and taskExecutor.
+   */
+  public void cancel() throws InterruptedException{
 
 Review comment:
   Nit: Add a whitespace after InterruptedException.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r387990181
 
 

 ##########
 File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 ##########
 @@ -160,25 +163,40 @@ public void run()
       }
     } catch (InterruptedException interrupt) {
       log.info("Job interrupted by InterrupedException.");
-      interruptTaskExecution(countDownLatch);
+      interruptTaskExecution(Optional.of(countDownLatch));
     }
     log.info("All assigned tasks of job {} have completed in container {}", jobId, containerIdOptional.or(""));
   }
 
-  private void interruptTaskExecution(CountDownLatch countDownLatch)
-      throws InterruptedException {
+  /**
+   * A helper function that shutting down all outstanding tasks and destroy taskExecutor if timeout on waiting for
+   * certain tasks' termination.
+   */
+  private void interruptTaskExecution(Optional<CountDownLatch> countDownLatch) throws InterruptedException {
     log.info("Job interrupted. Attempting a graceful shutdown of the job.");
-    this.tasks.forEach(Task::shutdown);
-    if (!countDownLatch.await(5, TimeUnit.SECONDS)) {
-      log.warn("Graceful shutdown of job timed out. Killing all outstanding tasks.");
-      try {
+    this.shutdownTasks();
+    try {
+      if (countDownLatch.isPresent()) {
+        if (!countDownLatch.get().await(5, TimeUnit.SECONDS)) {
+          log.warn("Graceful shutdown of job timed out. Killing all outstanding tasks.");
+          this.taskExecutor.shutDown();
 
 Review comment:
   Can this line be refactored outside the else {..} block to avoid it being called in both if and the else?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r404236184
 
 

 ##########
 File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 ##########
 @@ -152,13 +155,15 @@ public void run()
     }
 
     CountUpAndDownLatch countDownLatch = new CountUpAndDownLatch(0);
-    this.tasks = runWorkUnits(countDownLatch);
+    Pair<List<Task>, List<Future<?>>> taskExecutionResult = runWorkUnits(countDownLatch);
+    this.tasks = taskExecutionResult.getLeft();
+    this.taskFutures = taskExecutionResult.getRight();
     log.info("Waiting for submitted tasks of job {} to complete in container {}...", jobId, containerIdOptional.or(""));
     try {
       while (countDownLatch.getCount() > 0) {
         if (this.interruptionPredicate.test(this)) {
           log.info("Interrupting task execution due to satisfied predicate.");
-          interruptTaskExecution(countDownLatch);
+          interruptTaskExecution(Optional.of(countDownLatch));
 
 Review comment:
   Also: should we just call cancel here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r387985305
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java
 ##########
 @@ -19,27 +19,37 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.TimeoutException;
 
+import org.apache.gobblin.runtime.TaskContext;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.task.BaseAbstractTask;
+import org.apache.gobblin.testing.AssertWithBackoff;
+
+import com.google.common.base.Predicate;
 import com.google.common.base.Throwables;
 import com.google.common.io.Files;
 
+import javax.annotation.Nullable;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.runtime.TaskContext;
-import org.apache.gobblin.runtime.TaskState;
-import org.apache.gobblin.runtime.task.BaseAbstractTask;
 
 @Slf4j
 public class SleepingTask extends BaseAbstractTask {
   public static final String TASK_STATE_FILE_KEY = "task.state.file.path";
+  public static final String SLEEPING_TASK_SLEEP_TIME = "data.publisher.sleep.time.in.seconds";
 
   private final long sleepTime;
   private File taskStateFile;
+  // If sleepTime is configured to be a negative value, it indicates user wants to put it in long sleep until
+  // external interruption.
+  private boolean infiniteSleep;
 
   public SleepingTask(TaskContext taskContext) {
     super(taskContext);
     TaskState taskState = taskContext.getTaskState();
-    sleepTime = taskState.getPropAsLong("data.publisher.sleep.time.in.seconds", 10L);
+    sleepTime = taskState.getPropAsLong(SLEEPING_TASK_SLEEP_TIME, 10L);
+    infiniteSleep = sleepTime < 0;
 
 Review comment:
   infiniteSleep -> isInfiniteSleep?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r404661941
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java
 ##########
 @@ -64,19 +70,27 @@ public void run() {
         throw new IOException("File creation error: " + taskStateFile.getName());
       }
       long endTime = System.currentTimeMillis() + sleepTime * 1000;
-      while (System.currentTimeMillis() <= endTime) {
-        Thread.sleep(1000L);
-        log.warn("Sleeping for {} seconds", sleepTime);
-      }
+
+      AssertWithBackoff.create().maxSleepMs(1000).timeoutMs(Integer.MAX_VALUE).backoffFactor(1).
 
 Review comment:
   What's the point of giving timeoutMs(Integer.MAX_VALUE) ?
   
   I think AssertWithBackoff should be used in place of Thread.Sleep when sleeping time is not known, e.g. when we are expecting for something to be true (or something to happen), but do not know when that will happen, and at the same time we do not want to sleep for a fixed amount of time which could be 1) too big, which would delay build, or 2) too small , that what we were expecting did not happen in that time.
   
   I know AssertWithBackoff is recommended in Gobblin team, but in this case, sleep time is fixed. AssertWithBackoff makes no sense here. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] autumnust commented on issue #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
autumnust commented on issue #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#issuecomment-594266598
 
 
   @sv2000  Can you please take a look whenever you have time? Thanks. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r387988894
 
 

 ##########
 File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 ##########
 @@ -160,25 +163,40 @@ public void run()
       }
     } catch (InterruptedException interrupt) {
       log.info("Job interrupted by InterrupedException.");
-      interruptTaskExecution(countDownLatch);
+      interruptTaskExecution(Optional.of(countDownLatch));
     }
     log.info("All assigned tasks of job {} have completed in container {}", jobId, containerIdOptional.or(""));
   }
 
-  private void interruptTaskExecution(CountDownLatch countDownLatch)
-      throws InterruptedException {
+  /**
+   * A helper function that shutting down all outstanding tasks and destroy taskExecutor if timeout on waiting for
 
 Review comment:
   Change to "...that shuts down all outstanding tasks and shuts down the taskExecutor if it times out on a task termination"

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r387987780
 
 

 ##########
 File path: gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
 ##########
 @@ -40,15 +42,23 @@
         ConfigurationKeys.SOURCE_CLASS_KEY, "org.apache.gobblin.cluster.SleepingCustomTaskSource",
         ConfigurationKeys.JOB_ID_KEY, JOB_ID,
         GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, Boolean.TRUE,
-        GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 10L, SleepingTask.TASK_STATE_FILE_KEY, TASK_STATE_FILE))
-        .withFallback(rawJobConfig);
+        GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 10L,
+
+        // Put SleepingTask in infinite sleep so that cancellation thereby ensuring cancellation to happen.
+        SleepingTask.TASK_STATE_FILE_KEY, TASK_STATE_FILE))
+        .withFallback(rawJobConfig.withValue(SLEEPING_TASK_SLEEP_TIME, ConfigValueFactory.fromAnyRef(-1)));
     return ImmutableMap.of(JOB_NAME, newConfig);
   }
 
+  /**
+   * Note This verification now ties to {@link SleepingTask} which is not ideal.
+   */
   @Override
   public void waitForAndVerifyOutputFiles() throws Exception {
+    // Sleeing task is in infinite sleeping, so unless an cancelling is hitting task execution, this line won't be printed.
 
 Review comment:
   Change comment to: "SleepingTask is in an infinite sleep. The log line is printed only when a cancellation in invoked."

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r404205107
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
 ##########
 @@ -167,7 +167,7 @@ public void cancel() {
     if (_taskAttempt != null) {
       try {
         _logger.info("Task cancelled: Shutdown starting for tasks with jobId: {}", _jobId);
 
 Review comment:
   Let's change the log message here. "Cancellation invoked for tasks with jobId: {}" sounds more accurate.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r404230388
 
 

 ##########
 File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskIFaceWrapper.java
 ##########
 @@ -31,6 +28,9 @@
 import org.apache.gobblin.runtime.TaskStateTracker;
 import org.apache.gobblin.runtime.fork.Fork;
 
+import com.google.common.base.Optional;
 
 Review comment:
   Same comment about the import order as earlier.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r387984837
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
 ##########
 @@ -155,7 +155,7 @@ public void cancel() {
     if (_taskattempt != null) {
       try {
         _logger.info("Task cancelled: Shutdown starting for tasks with jobId: {}", _jobId);
-        _taskattempt.shutdownTasks();
+        _taskattempt.cancelGobblinMultiAttempts();
 
 Review comment:
   Just _taskattempt.cancel()?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r387986073
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java
 ##########
 @@ -64,19 +74,27 @@ public void run() {
         throw new IOException("File creation error: " + taskStateFile.getName());
       }
       long endTime = System.currentTimeMillis() + sleepTime * 1000;
-      while (System.currentTimeMillis() <= endTime) {
-        Thread.sleep(1000L);
-        log.warn("Sleeping for {} seconds", sleepTime);
-      }
+
+      AssertWithBackoff.create().maxSleepMs(1000).timeoutMs(Integer.MAX_VALUE).backoffFactor(1).
+          assertTrue(new Predicate<Void>() {
+            @Override
+            public boolean apply(@Nullable Void input) {
+              return System.currentTimeMillis() > endTime && !infiniteSleep;
 
 Review comment:
   Nit: change the order of evaluation to: !infiniteSleep && System.currentTimeMillis() > endTIme

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r404219750
 
 

 ##########
 File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 ##########
 @@ -152,13 +155,15 @@ public void run()
     }
 
     CountUpAndDownLatch countDownLatch = new CountUpAndDownLatch(0);
-    this.tasks = runWorkUnits(countDownLatch);
+    Pair<List<Task>, List<Future<?>>> taskExecutionResult = runWorkUnits(countDownLatch);
+    this.tasks = taskExecutionResult.getLeft();
+    this.taskFutures = taskExecutionResult.getRight();
     log.info("Waiting for submitted tasks of job {} to complete in container {}...", jobId, containerIdOptional.or(""));
     try {
       while (countDownLatch.getCount() > 0) {
         if (this.interruptionPredicate.test(this)) {
           log.info("Interrupting task execution due to satisfied predicate.");
-          interruptTaskExecution(countDownLatch);
+          interruptTaskExecution(Optional.of(countDownLatch));
 
 Review comment:
   Optional is not intended to be used as a function parameter and should be used only as a return value. [Here](https://www.quora.com/Why-is-using-java-util-Optional-as-a-method-parameter-not-recommended) is one of several references describing the problems with using Optional as a function parameter.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r404225329
 
 

 ##########
 File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 ##########
 @@ -169,25 +174,35 @@ public void run()
       }
     } catch (InterruptedException interrupt) {
       log.info("Job interrupted by InterrupedException.");
-      interruptTaskExecution(countDownLatch);
+      interruptTaskExecution(Optional.of(countDownLatch));
     }
     log.info("All assigned tasks of job {} have completed in container {}", jobId, containerIdOptional.or(""));
   }
 
-  private void interruptTaskExecution(CountDownLatch countDownLatch)
-      throws InterruptedException {
+  /**
+   * A helper function that that shuts down all outstanding tasks and
+   * shuts down the taskExecutor if it times out on a task termination.
+   */
+  private void interruptTaskExecution(Optional<CountDownLatch> countDownLatch) throws InterruptedException {
     log.info("Job interrupted. Attempting a graceful shutdown of the job.");
-    this.tasks.forEach(Task::shutdown);
-    if (!countDownLatch.await(5, TimeUnit.SECONDS)) {
-      log.warn("Graceful shutdown of job timed out. Killing all outstanding tasks.");
-      try {
+    this.shutdownTasks();
+    try {
+      if (!countDownLatch.isPresent() || !countDownLatch.get().await(5, TimeUnit.SECONDS)) {
+        log.warn("Shutting down TaskExecutor. Killing all outstanding tasks.");
         this.taskExecutor.shutDown();
-      } catch (Throwable t) {
-        throw new RuntimeException("Failed to shutdown task executor.", t);
       }
+    } catch (Exception e) {
 
 Review comment:
   Curious why we changed from Throwable to Exception here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r404207115
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java
 ##########
 @@ -64,19 +70,27 @@ public void run() {
         throw new IOException("File creation error: " + taskStateFile.getName());
       }
       long endTime = System.currentTimeMillis() + sleepTime * 1000;
-      while (System.currentTimeMillis() <= endTime) {
-        Thread.sleep(1000L);
-        log.warn("Sleeping for {} seconds", sleepTime);
-      }
+
+      AssertWithBackoff.create().maxSleepMs(1000).timeoutMs(Integer.MAX_VALUE).backoffFactor(1).
+          assertTrue(new Predicate<Void>() {
+            @Override
+            public boolean apply(@Nullable Void input) {
+              return System.currentTimeMillis() > endTime;
+            }
+          }, "Waiting for the job to start...");
+
       log.info("Hello World!");
       super.run();
     } catch (InterruptedException e) {
-      log.error("Sleep interrupted.");
+      log.info("Sleep interrupted.");
 
 Review comment:
   Nit: Prefer to keep the log level here to error, given that this is an exception we are propagating.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r404227870
 
 

 ##########
 File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 ##########
 @@ -169,25 +174,35 @@ public void run()
       }
     } catch (InterruptedException interrupt) {
       log.info("Job interrupted by InterrupedException.");
-      interruptTaskExecution(countDownLatch);
+      interruptTaskExecution(Optional.of(countDownLatch));
     }
     log.info("All assigned tasks of job {} have completed in container {}", jobId, containerIdOptional.or(""));
   }
 
-  private void interruptTaskExecution(CountDownLatch countDownLatch)
-      throws InterruptedException {
+  /**
+   * A helper function that that shuts down all outstanding tasks and
+   * shuts down the taskExecutor if it times out on a task termination.
+   */
+  private void interruptTaskExecution(Optional<CountDownLatch> countDownLatch) throws InterruptedException {
     log.info("Job interrupted. Attempting a graceful shutdown of the job.");
-    this.tasks.forEach(Task::shutdown);
-    if (!countDownLatch.await(5, TimeUnit.SECONDS)) {
-      log.warn("Graceful shutdown of job timed out. Killing all outstanding tasks.");
-      try {
+    this.shutdownTasks();
+    try {
+      if (!countDownLatch.isPresent() || !countDownLatch.get().await(5, TimeUnit.SECONDS)) {
+        log.warn("Shutting down TaskExecutor. Killing all outstanding tasks.");
         this.taskExecutor.shutDown();
-      } catch (Throwable t) {
-        throw new RuntimeException("Failed to shutdown task executor.", t);
       }
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to shutdown task executor.", e);
     }
   }
 
+  /**
+   * Shutting down the whole {@link GobblinMultiTaskAttempt} by shutting down all its outstanding tasks and taskExecutor.
+   */
+  public void cancel() throws InterruptedException{
 
 Review comment:
   Can we merge interruptTaskExecution and cancel() methods into one, particularly since cancel () is not doing anything more than calling the interruptTaskExecution method?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r405683659
 
 

 ##########
 File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 ##########
 @@ -416,14 +425,17 @@ private boolean taskSuccessfulInPriorAttempt(String taskId) {
           // task could not be created, so directly count down
           countDownLatch.countDown();
           log.error("Could not create task for workunit {}", workUnit, e);
-        } else if (!task.hasTaskFuture()) {
+        } else if (taskFuture == null) {
           // Task was created and may have been registered, but not submitted, so call the
           // task state tracker task run completion directly since the task cancel does nothing if not submitted
           this.taskStateTracker.onTaskRunCompletion(task);
           log.error("Could not submit task for workunit {}", workUnit, e);
         } else {
-          // task was created and submitted, but failed later, so cancel the task to decrement the CountDownLatch
-          task.cancel();
+          // task was created and submitted, but failed later(and still pop back here), so cancel the task to decrement the CountDownLatch
+          if (!taskFuture.cancel(true)) {
+            log.error("Failure in terminating submitted task");
+          }
+          this.taskStateTracker.onTaskRunCompletion(task);
 
 Review comment:
   Do we want to call `this.taskStateTracker.onTaskRunCompletion(task)` if `taskFuture.cancel(true)` returns false?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r387986542
 
 

 ##########
 File path: gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
 ##########
 @@ -40,15 +42,23 @@
         ConfigurationKeys.SOURCE_CLASS_KEY, "org.apache.gobblin.cluster.SleepingCustomTaskSource",
         ConfigurationKeys.JOB_ID_KEY, JOB_ID,
         GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, Boolean.TRUE,
-        GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 10L, SleepingTask.TASK_STATE_FILE_KEY, TASK_STATE_FILE))
-        .withFallback(rawJobConfig);
+        GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 10L,
+
+        // Put SleepingTask in infinite sleep so that cancellation thereby ensuring cancellation to happen.
 
 Review comment:
   Change comment to: 
   // Put SleepingTask in infinite sleep allowing cancellation to happen.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r404652815
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java
 ##########
 @@ -64,19 +70,27 @@ public void run() {
         throw new IOException("File creation error: " + taskStateFile.getName());
       }
       long endTime = System.currentTimeMillis() + sleepTime * 1000;
-      while (System.currentTimeMillis() <= endTime) {
-        Thread.sleep(1000L);
-        log.warn("Sleeping for {} seconds", sleepTime);
-      }
+
+      AssertWithBackoff.create().maxSleepMs(1000).timeoutMs(Integer.MAX_VALUE).backoffFactor(1).
+          assertTrue(new Predicate<Void>() {
+            @Override
+            public boolean apply(@Nullable Void input) {
+              return System.currentTimeMillis() > endTime;
+            }
+          }, "Waiting for the job to start...");
+
       log.info("Hello World!");
       super.run();
     } catch (InterruptedException e) {
-      log.error("Sleep interrupted.");
+      log.info("Sleep interrupted.");
 
 Review comment:
   I think I wrongly wrote it error. warn seems more appropriate to me. If it comes here, it does not mean that the SleepingTask did anything **erroneous**, but got interrupted. could be info too, but looks like an important message, especially in SleepingTask

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r404205440
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
 ##########
 @@ -167,7 +167,7 @@ public void cancel() {
     if (_taskAttempt != null) {
       try {
         _logger.info("Task cancelled: Shutdown starting for tasks with jobId: {}", _jobId);
-        _taskAttempt.shutdownTasks();
+        _taskAttempt.cancel();
         _logger.info("Task cancelled: Shutdown complete for tasks with jobId: {}", _jobId);
 
 Review comment:
   Same here. "Cancellation complete..."

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r404222143
 
 

 ##########
 File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
 ##########
 @@ -95,6 +78,21 @@
 import org.apache.gobblin.writer.WatermarkAwareWriter;
 import org.apache.gobblin.writer.WatermarkManager;
 import org.apache.gobblin.writer.WatermarkStorage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import com.google.common.base.Optional;
 
 Review comment:
   Is the ordering of imports correct? I thought org.apache.gobblin should come after com.google.*.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r405680816
 
 

 ##########
 File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 ##########
 @@ -372,9 +379,9 @@ private boolean taskSuccessfulInPriorAttempt(String taskId) {
    * @param countDownLatch a {@link java.util.concurrent.CountDownLatch} waited on for job completion
    * @return a list of {@link Task}s from the {@link WorkUnit}s
    */
-  private List<Task> runWorkUnits(CountUpAndDownLatch countDownLatch) {
-
+  private Pair<List<Task>, List<Future<?>>> runWorkUnits(CountUpAndDownLatch countDownLatch) {
 
 Review comment:
   How about returning List<Pair<Task, Future<?>> ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r404229876
 
 

 ##########
 File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
 ##########
 @@ -394,15 +391,8 @@ public void run() {
     } catch (Throwable t) {
       failTask(t);
     } finally {
-      synchronized (this) {
-        if (this.taskFuture == null || !this.taskFuture.isCancelled()) {
-          this.taskStateTracker.onTaskRunCompletion(this);
-          completeShutdown();
-          this.taskFuture = null;
-        } else {
-          LOG.info("will not decrease count down latch as this task is cancelled");
-        }
-      }
+      this.taskStateTracker.onTaskRunCompletion(this);
 
 Review comment:
   Is the removing of the synchronized block intentional? Is there a chance that Task#cancel() is invoked from multiple threads?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r404665523
 
 

 ##########
 File path: gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
 ##########
 @@ -20,20 +20,26 @@
 import org.junit.Assert;
 
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import static org.apache.gobblin.cluster.SleepingTask.SLEEPING_TASK_SLEEP_TIME;
 
 
 public class IntegrationJobCancelSuite extends IntegrationBasicSuite {
   public static final String JOB_ID = "job_HelloWorldTestJob_1234";
   public static final String TASK_STATE_FILE = "/tmp/IntegrationJobCancelSuite/taskState/_RUNNING";
 
   public IntegrationJobCancelSuite(Config jobConfigOverrides) {
-    super(jobConfigOverrides);
+    // Put SleepingTask in long sleep allowing cancellation to happen.
+    super(jobConfigOverrides.withValue(SLEEPING_TASK_SLEEP_TIME, ConfigValueFactory.fromAnyRef(100)));
   }
 
   @Override
   public void waitForAndVerifyOutputFiles() throws Exception {
+    // SleepingTask is in an infinite sleep. The log line is printed only when a cancellation in invoked.
+    Assert.assertTrue(verifyFileForMessage(this.jobLogOutputFile, "Sleep interrupted"));
+
     // If the job is cancelled, it should not have been able to write 'Hello World!'
     Assert.assertFalse(verifyFileForMessage(this.jobLogOutputFile, "Hello World!"));
 
 Review comment:
   Can we keep the deleted line. A bug was fixed and that line ensure that NPE does not happen. Yea, it can probably be replaced by Assert.assertFalse(verifyFileForMessage(this.jobLogOutputFile, "Exception"));

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r387987035
 
 

 ##########
 File path: gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
 ##########
 @@ -40,15 +42,23 @@
         ConfigurationKeys.SOURCE_CLASS_KEY, "org.apache.gobblin.cluster.SleepingCustomTaskSource",
         ConfigurationKeys.JOB_ID_KEY, JOB_ID,
         GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, Boolean.TRUE,
-        GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 10L, SleepingTask.TASK_STATE_FILE_KEY, TASK_STATE_FILE))
-        .withFallback(rawJobConfig);
+        GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 10L,
+
+        // Put SleepingTask in infinite sleep so that cancellation thereby ensuring cancellation to happen.
+        SleepingTask.TASK_STATE_FILE_KEY, TASK_STATE_FILE))
+        .withFallback(rawJobConfig.withValue(SLEEPING_TASK_SLEEP_TIME, ConfigValueFactory.fromAnyRef(-1)));
     return ImmutableMap.of(JOB_NAME, newConfig);
   }
 
+  /**
+   * Note This verification now ties to {@link SleepingTask} which is not ideal.
 
 Review comment:
   Note: This verification is now tied to....

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation
URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r404209616
 
 

 ##########
 File path: gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
 ##########
 @@ -121,7 +124,8 @@ void testJobShouldGetCancelled() throws Exception {
    *   We confirm the execution by again inspecting the zNode and ensuring its TargetState is START. </li>
    * </ul>
    */
-  @Test (dependsOnMethods = { "testJobShouldGetCancelled" }, groups = {"disabledOnTravis"})
+//  @Test (dependsOnMethods = { "testJobShouldGetCancelled" }, groups = {"disabledOnTravis"})
 
 Review comment:
   Can we remove the commented line?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services