You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/02/22 17:40:50 UTC
incubator-gobblin git commit: [GOBBLIN-302] Handle stuck Helix
workflow
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 52bf10de7 -> 97e29f436
[GOBBLIN-302] Handle stuck Helix workflow
Closes #2157 from arjun4084346/stuckHelixJob
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/97e29f43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/97e29f43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/97e29f43
Branch: refs/heads/master
Commit: 97e29f436db8a5949c9d7b593c80097a8952ab68
Parents: 52bf10d
Author: Arjun <ab...@linkedin.com>
Authored: Thu Feb 22 09:40:51 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Feb 22 09:40:51 2018 -0800
----------------------------------------------------------------------
.../configuration/ConfigurationKeys.java | 4 ++
.../cluster/GobblinHelixJobLauncher.java | 41 ++++++++++++++++++--
.../runtime/GobblinMultiTaskAttempt.java | 12 +++++-
.../java/org/apache/gobblin/runtime/Task.java | 30 +++++++++++++-
4 files changed, 80 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/97e29f43/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index d07d740..c80ceaf 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -108,6 +108,10 @@ public class ConfigurationKeys {
public static final String SCHEDULER_WAIT_FOR_JOB_COMPLETION_KEY = "scheduler.wait.for.job.completion";
public static final String DEFAULT_SCHEDULER_WAIT_FOR_JOB_COMPLETION = Boolean.TRUE.toString();
+ public static final String HELIX_JOB_TIMEOUT_ENABLED_KEY = "job.timeout.enabled";
+ public static final String DEFAULT_HELIX_JOB_TIMEOUT_ENABLED = "false";
+ public static final String HELIX_JOB_TIMEOUT_SECONDS = "job.timeout.seconds";
+ public static final String DEFAULT_HELIX_JOB_TIMEOUT_SECONDS = "10800";
/**
* Task executor and state tracker configuration properties.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/97e29f43/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index af15469..62c9b3f 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -107,6 +107,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
private final TaskDriver helixTaskDriver;
private final String helixQueueName;
private final String jobResourceName;
+ private JobListener jobListener;
private final FileSystem fs;
private final Path appWorkDir;
@@ -167,6 +168,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
this.taskStateCollectorService = new TaskStateCollectorService(jobProps, this.jobContext.getJobState(),
this.eventBus, this.stateStores.getTaskStateStore(), outputTaskStateDir);
+ startCancellationExecutor();
}
@Override
@@ -290,6 +292,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
public void launchJob(@Nullable JobListener jobListener)
throws JobException {
+ this.jobListener = jobListener;
boolean isLaunched = false;
this.runningMap.putIfAbsent(this.jobContext.getJobName(), false);
try {
@@ -359,8 +362,14 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
return workUnitFile.toString();
}
- private void waitForJobCompletion() throws InterruptedException {
- while (true) {
+ private void waitForJobCompletion() throws InterruptedException {
+ LOGGER.info("Waiting for job to complete...");
+ boolean timeoutEnabled = Boolean.parseBoolean(this.jobProps.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
+ ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED));
+ long timeoutInSeconds = Long.parseLong(this.jobProps.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
+ ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS));
+ long endTime = System.currentTimeMillis() + timeoutInSeconds*1000;
+ while (!timeoutEnabled || System.currentTimeMillis() <= endTime) {
WorkflowContext workflowContext = TaskDriver.getWorkflowContext(this.helixManager, this.helixQueueName);
if (workflowContext != null) {
org.apache.helix.task.TaskState helixJobState = workflowContext.getJobState(this.jobResourceName);
@@ -370,9 +379,35 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
return;
}
}
-
Thread.sleep(1000);
}
+ helixTaskDriverWaitToStop(this.helixQueueName, 10L);
+ try {
+ cancelJob(this.jobListener);
+ } catch (JobException e) {
+ throw new RuntimeException("Unable to cancel job " + jobContext.getJobName() + ": ", e);
+ }
+ this.helixTaskDriver.resume(this.helixQueueName);
+ LOGGER.info("stopped the queue, deleted the job");
+ }
+
+ /**
+ * Because fix https://github.com/apache/helix/commit/ae8e8e2ef37f48d782fc12f85ca97728cf2b70c4
+ * is not available in currently used version 0.6.9
+ */
+ private void helixTaskDriverWaitToStop(String workflow, long timeoutInSeconds) throws InterruptedException {
+ this.helixTaskDriver.stop(workflow);
+ long endTime = System.currentTimeMillis() + timeoutInSeconds*1000;
+ while (System.currentTimeMillis() <= endTime) {
+ WorkflowContext workflowContext = TaskDriver.getWorkflowContext(this.helixManager, this.helixQueueName);
+ if (workflowContext == null || workflowContext.getWorkflowState()
+ .equals(org.apache.helix.task.TaskState.IN_PROGRESS)) {
+ Thread.sleep(1000);
+ } else {
+ LOGGER.info("Successfully stopped the queue");
+ return;
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/97e29f43/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index aa42121..e5643c0 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -202,9 +202,17 @@ public class GobblinMultiTaskAttempt {
task.shutdown();
}
- for (Task task: this.tasks) {
+ for (Task task : this.tasks) {
task.awaitShutdown(1000);
}
+
+ for (Task task : this.tasks) {
+ if (task.cancel()) {
+ log.info("Task {} cancelled.", task.getTaskId());
+ } else {
+ log.info("Task {} could not be cancelled.", task.getTaskId());
+ }
+ }
}
private void persistTaskStateStore()
@@ -343,8 +351,8 @@ public class GobblinMultiTaskAttempt {
// Create a new task from the work unit and submit the task to run
Task task = createTaskRunnable(workUnitState, countDownLatch);
this.taskStateTracker.registerNewTask(task);
+ task.setTaskFuture(this.taskExecutor.submit(task));
tasks.add(task);
- this.taskExecutor.execute(task);
}
new EventSubmitter.Builder(JobMetrics.get(this.jobId).getMetricContext(), "gobblin.runtime").build()
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/97e29f43/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index c3c1b99..52b0960 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -150,6 +150,7 @@ public class Task implements TaskIFace {
private final AtomicBoolean shutdownRequested;
private volatile long shutdownRequestedTime = Long.MAX_VALUE;
private final CountDownLatch shutdownLatch;
+ private Future<?> taskFuture;
/**
* Instantiate a new {@link Task}.
@@ -364,8 +365,15 @@ public class Task implements TaskIFace {
} catch (Throwable t) {
failTask(t);
} finally {
- this.taskStateTracker.onTaskRunCompletion(this);
- completeShutdown();
+ synchronized (this) {
+ if (this.taskFuture == null || !this.taskFuture.isCancelled()) {
+ this.taskStateTracker.onTaskRunCompletion(this);
+ completeShutdown();
+ this.taskFuture = null;
+ } else {
+ LOG.info("will not decrease count down latch as this task is cancelled");
+ }
+ }
}
}
@@ -952,4 +960,22 @@ public class Task implements TaskIFace {
}
return true;
}
+
+ public synchronized void setTaskFuture(Future<?> taskFuture) {
+ this.taskFuture = taskFuture;
+ }
+
+ /**
+ * return true if the task is successfully cancelled.
+ * @return
+ */
+ public synchronized boolean cancel() {
+ if (this.taskFuture != null && this.taskFuture.cancel(true)) {
+ this.taskStateTracker.onTaskRunCompletion(this);
+ this.completeShutdown();
+ return true;
+ } else {
+ return false;
+ }
+ }
}