You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/08/09 22:12:45 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-817] Implement
a workaround for Helix Workflow being stuck in STOPPING state.[]
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 50280ee [GOBBLIN-817] Implement a workaround for Helix Workflow being stuck in STOPPING state.[]
50280ee is described below
commit 50280ee6bd591e74746faf4cb4452733095e3c36
Author: sv2000 <su...@gmail.com>
AuthorDate: Fri Aug 9 15:12:30 2019 -0700
[GOBBLIN-817] Implement a workaround for Helix Workflow being stuck in STOPPING state.[]
Closes #2681 from sv2000/helixStoppingWorkaround
---
.../cluster/GobblinClusterConfigurationKeys.java | 3 +++
.../GobblinHelixDistributeJobExecutionLauncher.java | 8 ++++++--
.../gobblin/cluster/GobblinHelixJobLauncher.java | 8 +++++++-
.../java/org/apache/gobblin/cluster/HelixUtils.java | 20 +++++++++++++++++---
4 files changed, 33 insertions(+), 6 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 4881a99..15c197f 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -164,4 +164,7 @@ public class GobblinClusterConfigurationKeys {
public static final String CANCEL_RUNNING_JOB_ON_DELETE = GOBBLIN_CLUSTER_PREFIX + "job.cancelRunningJobOnDelete";
public static final String DEFAULT_CANCEL_RUNNING_JOB_ON_DELETE = "false";
+
+ public static final String HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "job.stoppingStateTimeoutSeconds";
+ public static final long DEFAULT_HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS = 300;
}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index 36239a2..cf07e38 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -57,7 +57,6 @@ import org.apache.gobblin.runtime.api.JobExecutionMonitor;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.MonitoredObject;
import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.PropertiesUtils;
@@ -304,12 +303,17 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS));
+ long stoppingStateTimeoutInSeconds = PropertiesUtils
+ .getPropAsLong(this.jobPlanningProps, GobblinClusterConfigurationKeys.HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS,
+ GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS);
+
try {
HelixUtils.waitJobCompletion(
GobblinHelixDistributeJobExecutionLauncher.this.planningJobHelixManager,
workFlowName,
jobName,
- timeoutEnabled ? Optional.of(timeoutInSeconds) : Optional.empty());
+ timeoutEnabled ? Optional.of(timeoutInSeconds) : Optional.empty(),
+ stoppingStateTimeoutInSeconds);
return getResultFromUserContent();
} catch (TimeoutException te) {
HelixUtils.handleJobTimeout(workFlowName, jobName,
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 6d41cc7..d37dab1 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -69,6 +69,7 @@ import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.Id;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.ParallelRunner;
+import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.SerializationUtils;
@@ -429,12 +430,17 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS));
+ long stoppingStateTimeoutInSeconds = PropertiesUtils
+ .getPropAsLong(this.jobProps, GobblinClusterConfigurationKeys.HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS,
+ GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS);
+
try {
HelixUtils.waitJobCompletion(
this.helixManager,
this.helixWorkFlowName,
this.jobContext.getJobId(),
- timeoutEnabled? Optional.of(timeoutInSeconds) : Optional.empty());
+ timeoutEnabled? Optional.of(timeoutInSeconds) : Optional.empty(),
+ stoppingStateTimeoutInSeconds);
} catch (TimeoutException te) {
HelixUtils.handleJobTimeout(helixWorkFlowName, jobContext.getJobId(),
helixManager, this, this.jobListener);
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index b874ff3..340c9e7 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -156,14 +156,17 @@ public class HelixUtils {
}
static void waitJobCompletion(HelixManager helixManager, String workFlowName, String jobName,
- Optional<Long> timeoutInSeconds) throws InterruptedException, TimeoutException {
-
+ Optional<Long> timeoutInSeconds, Long stoppingStateTimeoutInSeconds) throws InterruptedException, TimeoutException {
log.info("Waiting for job {} to complete...", jobName);
long endTime = 0;
+ long currentTimeMillis = System.currentTimeMillis();
+
if (timeoutInSeconds.isPresent()) {
- endTime = System.currentTimeMillis() + timeoutInSeconds.get() * 1000;
+ endTime = currentTimeMillis + timeoutInSeconds.get() * 1000;
}
+ long stoppingStateEndTime = currentTimeMillis + stoppingStateTimeoutInSeconds * 1000;
+
while (!timeoutInSeconds.isPresent() || System.currentTimeMillis() <= endTime) {
WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName);
if (workflowContext != null) {
@@ -177,6 +180,16 @@ public class HelixUtils {
case FAILED:
case COMPLETED:
return;
+ case STOPPING:
+ log.info("Waiting for job {} to complete... State - {}", jobName, jobState);
+ Thread.sleep(1000);
+ // Workaround for a Helix bug where a job may be stuck in the STOPPING state due to an unresponsive task.
+ if (System.currentTimeMillis() > stoppingStateEndTime) {
+ log.info("Deleting workflow {}", workFlowName);
+ new TaskDriver(helixManager).delete(workFlowName);
+ log.info("Deleted workflow {}", workFlowName);
+ }
+ return;
default:
log.info("Waiting for job {} to complete... State - {}", jobName, jobState);
Thread.sleep(1000);
@@ -257,6 +270,7 @@ public class HelixUtils {
new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, 10000L);
log.info("Workflow deleted.");
}
+
/**
* Returns the Helix Workflow Ids given {@link Iterable} of Gobblin job names. The method returns a
* {@link java.util.Map} from Gobblin job name to the corresponding Helix Workflow Id. This method iterates