You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/08/15 20:55:06 UTC

incubator-gobblin git commit: [GOBBLIN-159] Allow Helix jobs to be gracefully canceled

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 7e63c7b91 -> 56cecb28a


[GOBBLIN-159] Allow Helix jobs to be gracefully canceled

[GOBBLIN-159] Allow Helix jobs to be gracefully
canceled.

Fixed some PR comments

Add flag to check if cancellation was requested
and cancel only if that flag is set.

Closes #2037 from kadaan/Fix_for_GOBBLIN-159


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/56cecb28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/56cecb28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/56cecb28

Branch: refs/heads/master
Commit: 56cecb28addb912d4e2668ff2d271c99ca3c4cdc
Parents: 7e63c7b
Author: Joel Baranick <jo...@ensighten.com>
Authored: Tue Aug 15 13:55:01 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Tue Aug 15 13:55:01 2017 -0700

----------------------------------------------------------------------
 .../apache/gobblin/cluster/GobblinHelixJob.java | 45 ++++++-----
 .../apache/gobblin/scheduler/JobScheduler.java  | 80 +++++++++++++++++++-
 2 files changed, 104 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56cecb28/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
index d9eab64..265be3c 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
@@ -19,25 +19,26 @@ package org.apache.gobblin.cluster;
 
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.Future;
 
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
 
+import org.quartz.InterruptableJob;
 import org.quartz.Job;
 import org.quartz.JobDataMap;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
 
 import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.JobLauncher;
 import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.gobblin.scheduler.BaseGobblinJob;
 import org.apache.gobblin.scheduler.JobScheduler;
+import org.quartz.UnableToInterruptJobException;
 
 
 /**
@@ -48,7 +49,9 @@ import org.apache.gobblin.scheduler.JobScheduler;
  */
 @Alpha
 @Slf4j
-public class GobblinHelixJob extends BaseGobblinJob {
+public class GobblinHelixJob extends BaseGobblinJob implements InterruptableJob {
+  private Future cancellable = null;
+
   @Override
   public void executeImpl(JobExecutionContext context) throws JobExecutionException {
     JobDataMap dataMap = context.getJobDetail().getJobDataMap();
@@ -65,28 +68,32 @@ public class GobblinHelixJob extends BaseGobblinJob {
 
     try {
       final JobLauncher jobLauncher = new GobblinHelixJobLauncher(jobProps, helixManager, appWorkDir, eventMetadata);
-
       if (Boolean.valueOf(jobProps.getProperty(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD,
-          Boolean.toString(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD_DEFAULT)))) {
+              Boolean.toString(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD_DEFAULT)))) {
         jobScheduler.runJob(jobProps, jobListener, jobLauncher);
       } else {
-        // if not executing in the scheduling thread then submit a runnable to the job scheduler's ExecutorService
-        // for asynchronous execution.
-        Runnable runnable = new Runnable() {
-          @Override
-          public void run() {
-            try {
-              jobScheduler.runJob(jobProps, jobListener, jobLauncher);
-            } catch (JobException je) {
-              log.error("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
-            }
-          }
-        };
-
-        jobScheduler.submitRunnableToExecutor(runnable);
+        cancellable = jobScheduler.scheduleJobImmediately(jobProps, jobListener, jobLauncher);
       }
     } catch (Throwable t) {
       throw new JobExecutionException(t);
     }
   }
+
+  @Override
+  public void interrupt() throws UnableToInterruptJobException {
+    if (cancellable != null) {
+      try {
+        if (cancellable.cancel(false)) {
+          return;
+        }
+      } catch (Exception e) {
+        log.error("Failed to gracefully cancel job. Attempting to force cancellation.", e);
+      }
+      try {
+        cancellable.cancel(true);
+      } catch (Exception e) {
+        throw new UnableToInterruptJobException(e);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56cecb28/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
index 0772d60..2313c13 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
@@ -24,10 +24,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.hadoop.fs.Path;
@@ -131,6 +135,8 @@ public class JobScheduler extends AbstractIdleService {
 
   private final Closer closer = Closer.create();
 
+  private volatile boolean cancelRequested = false;
+
   public JobScheduler(Properties properties, SchedulerService scheduler)
       throws Exception {
     this.properties = properties;
@@ -199,10 +205,14 @@ public class JobScheduler extends AbstractIdleService {
       throws Exception {
     LOG.info("Stopping the job scheduler");
     closer.close();
-
+    cancelRequested = true;
     List<JobExecutionContext> currentExecutions = this.scheduler.getScheduler().getCurrentlyExecutingJobs();
     for (JobExecutionContext jobExecutionContext : currentExecutions) {
-      this.scheduler.getScheduler().interrupt(jobExecutionContext.getFireInstanceId());
+      try {
+        this.scheduler.getScheduler().interrupt(jobExecutionContext.getFireInstanceId());
+      } catch (UnableToInterruptJobException e) {
+        LOG.error("Failed to cancel job " + jobExecutionContext.getJobDetail().getKey(), e);
+      }
     }
 
     ExecutorsUtils.shutdownExecutorService(this.jobExecutor, Optional.of(LOG));
@@ -231,6 +241,72 @@ public class JobScheduler extends AbstractIdleService {
   }
 
   /**
+   * Schedule a job immediately.
+   *
+   * <p>
+   *   This method calls the Quartz scheduler to scheduler the job.
+   * </p>
+   *
+   * @param jobProps Job configuration properties
+   * @param jobListener {@link JobListener} used for callback,
+   *                    can be <em>null</em> if no callback is needed.
+   * @throws JobException when there is anything wrong
+   *                      with scheduling the job
+   */
+  public Future<?> scheduleJobImmediately(Properties jobProps, JobListener jobListener, JobLauncher jobLauncher) {
+    Runnable runnable = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          runJob(jobProps, jobListener, jobLauncher);
+        } catch (JobException je) {
+          LOG.error("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
+        }
+      }
+    };
+    final Future<?> future = this.jobExecutor.submit(runnable);
+    return new Future() {
+      @Override
+      public boolean cancel(boolean mayInterruptIfRunning) {
+        if (!cancelRequested) {
+          return false;
+        }
+        boolean result = true;
+        try {
+          jobLauncher.cancelJob(jobListener);
+        } catch (JobException e) {
+          LOG.error("Failed to cancel job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
+          result = false;
+        }
+        if (mayInterruptIfRunning) {
+          result &= future.cancel(true);
+        }
+        return result;
+      }
+
+      @Override
+      public boolean isCancelled() {
+        return future.isCancelled();
+      }
+
+      @Override
+      public boolean isDone() {
+        return future.isDone();
+      }
+
+      @Override
+      public Object get() throws InterruptedException, ExecutionException {
+        return future.get();
+      }
+
+      @Override
+      public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+        return future.get(timeout, unit);
+      }
+    };
+  }
+
+  /**
    * Submit a runnable to the {@link ExecutorService} of this {@link JobScheduler}.
    * @param runnable the runnable to submit to the job executor
    */