You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/08/15 20:55:06 UTC
incubator-gobblin git commit: [GOBBLIN-159] Allow Helix jobs to be
gracefully canceled
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 7e63c7b91 -> 56cecb28a
[GOBBLIN-159] Allow Helix jobs to be gracefully canceled
[GOBBLIN-159] Allow Helix jobs to be gracefully
canceled.
Fixed some PR comments
Add flag to check if cancellation was requested
and cancel only if that flag is set.
Closes #2037 from kadaan/Fix_for_GOBBLIN-159
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/56cecb28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/56cecb28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/56cecb28
Branch: refs/heads/master
Commit: 56cecb28addb912d4e2668ff2d271c99ca3c4cdc
Parents: 7e63c7b
Author: Joel Baranick <jo...@ensighten.com>
Authored: Tue Aug 15 13:55:01 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Tue Aug 15 13:55:01 2017 -0700
----------------------------------------------------------------------
.../apache/gobblin/cluster/GobblinHelixJob.java | 45 ++++++-----
.../apache/gobblin/scheduler/JobScheduler.java | 80 +++++++++++++++++++-
2 files changed, 104 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56cecb28/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
index d9eab64..265be3c 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
@@ -19,25 +19,26 @@ package org.apache.gobblin.cluster;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixManager;
+import org.quartz.InterruptableJob;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.scheduler.BaseGobblinJob;
import org.apache.gobblin.scheduler.JobScheduler;
+import org.quartz.UnableToInterruptJobException;
/**
@@ -48,7 +49,9 @@ import org.apache.gobblin.scheduler.JobScheduler;
*/
@Alpha
@Slf4j
-public class GobblinHelixJob extends BaseGobblinJob {
+public class GobblinHelixJob extends BaseGobblinJob implements InterruptableJob {
+ private Future cancellable = null;
+
@Override
public void executeImpl(JobExecutionContext context) throws JobExecutionException {
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
@@ -65,28 +68,32 @@ public class GobblinHelixJob extends BaseGobblinJob {
try {
final JobLauncher jobLauncher = new GobblinHelixJobLauncher(jobProps, helixManager, appWorkDir, eventMetadata);
-
if (Boolean.valueOf(jobProps.getProperty(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD,
- Boolean.toString(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD_DEFAULT)))) {
+ Boolean.toString(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD_DEFAULT)))) {
jobScheduler.runJob(jobProps, jobListener, jobLauncher);
} else {
- // if not executing in the scheduling thread then submit a runnable to the job scheduler's ExecutorService
- // for asynchronous execution.
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- try {
- jobScheduler.runJob(jobProps, jobListener, jobLauncher);
- } catch (JobException je) {
- log.error("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
- }
- }
- };
-
- jobScheduler.submitRunnableToExecutor(runnable);
+ cancellable = jobScheduler.scheduleJobImmediately(jobProps, jobListener, jobLauncher);
}
} catch (Throwable t) {
throw new JobExecutionException(t);
}
}
+
+ @Override
+ public void interrupt() throws UnableToInterruptJobException {
+ if (cancellable != null) {
+ try {
+ if (cancellable.cancel(false)) {
+ return;
+ }
+ } catch (Exception e) {
+ log.error("Failed to gracefully cancel job. Attempting to force cancellation.", e);
+ }
+ try {
+ cancellable.cancel(true);
+ } catch (Exception e) {
+ throw new UnableToInterruptJobException(e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56cecb28/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
index 0772d60..2313c13 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
@@ -24,10 +24,14 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.hadoop.fs.Path;
@@ -131,6 +135,8 @@ public class JobScheduler extends AbstractIdleService {
private final Closer closer = Closer.create();
+ private volatile boolean cancelRequested = false;
+
public JobScheduler(Properties properties, SchedulerService scheduler)
throws Exception {
this.properties = properties;
@@ -199,10 +205,14 @@ public class JobScheduler extends AbstractIdleService {
throws Exception {
LOG.info("Stopping the job scheduler");
closer.close();
-
+ cancelRequested = true;
List<JobExecutionContext> currentExecutions = this.scheduler.getScheduler().getCurrentlyExecutingJobs();
for (JobExecutionContext jobExecutionContext : currentExecutions) {
- this.scheduler.getScheduler().interrupt(jobExecutionContext.getFireInstanceId());
+ try {
+ this.scheduler.getScheduler().interrupt(jobExecutionContext.getFireInstanceId());
+ } catch (UnableToInterruptJobException e) {
+ LOG.error("Failed to cancel job " + jobExecutionContext.getJobDetail().getKey(), e);
+ }
}
ExecutorsUtils.shutdownExecutorService(this.jobExecutor, Optional.of(LOG));
@@ -231,6 +241,72 @@ public class JobScheduler extends AbstractIdleService {
}
/**
+ * Schedule a job immediately.
+ *
+ * <p>
+ * This method calls the Quartz scheduler to scheduler the job.
+ * </p>
+ *
+ * @param jobProps Job configuration properties
+ * @param jobListener {@link JobListener} used for callback,
+ * can be <em>null</em> if no callback is needed.
+ * @throws JobException when there is anything wrong
+ * with scheduling the job
+ */
+ public Future<?> scheduleJobImmediately(Properties jobProps, JobListener jobListener, JobLauncher jobLauncher) {
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ runJob(jobProps, jobListener, jobLauncher);
+ } catch (JobException je) {
+ LOG.error("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
+ }
+ }
+ };
+ final Future<?> future = this.jobExecutor.submit(runnable);
+ return new Future() {
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ if (!cancelRequested) {
+ return false;
+ }
+ boolean result = true;
+ try {
+ jobLauncher.cancelJob(jobListener);
+ } catch (JobException e) {
+ LOG.error("Failed to cancel job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
+ result = false;
+ }
+ if (mayInterruptIfRunning) {
+ result &= future.cancel(true);
+ }
+ return result;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return future.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return future.isDone();
+ }
+
+ @Override
+ public Object get() throws InterruptedException, ExecutionException {
+ return future.get();
+ }
+
+ @Override
+ public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return future.get(timeout, unit);
+ }
+ };
+ }
+
+ /**
* Submit a runnable to the {@link ExecutorService} of this {@link JobScheduler}.
* @param runnable the runnable to submit to the job executor
*/