You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/08/09 22:12:45 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-817] Implement a workaround for Helix Workflow being stuck in STOPPING state.[]

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 50280ee  [GOBBLIN-817] Implement a workaround for Helix Workflow being stuck in STOPPING state.[]
50280ee is described below

commit 50280ee6bd591e74746faf4cb4452733095e3c36
Author: sv2000 <su...@gmail.com>
AuthorDate: Fri Aug 9 15:12:30 2019 -0700

    [GOBBLIN-817] Implement a workaround for Helix Workflow being stuck in STOPPING state.[]
    
    Closes #2681 from sv2000/helixStoppingWorkaround
---
 .../cluster/GobblinClusterConfigurationKeys.java     |  3 +++
 .../GobblinHelixDistributeJobExecutionLauncher.java  |  8 ++++++--
 .../gobblin/cluster/GobblinHelixJobLauncher.java     |  8 +++++++-
 .../java/org/apache/gobblin/cluster/HelixUtils.java  | 20 +++++++++++++++++---
 4 files changed, 33 insertions(+), 6 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 4881a99..15c197f 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -164,4 +164,7 @@ public class GobblinClusterConfigurationKeys {
 
   public static final String CANCEL_RUNNING_JOB_ON_DELETE = GOBBLIN_CLUSTER_PREFIX + "job.cancelRunningJobOnDelete";
   public static final String DEFAULT_CANCEL_RUNNING_JOB_ON_DELETE = "false";
+
+  public static final String HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "job.stoppingStateTimeoutSeconds";
+  public static final long DEFAULT_HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS = 300;
 }
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index 36239a2..cf07e38 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -57,7 +57,6 @@ import org.apache.gobblin.runtime.api.JobExecutionMonitor;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.MonitoredObject;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.PropertiesUtils;
 
 
@@ -304,12 +303,17 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
         GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
         GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS));
 
+    long stoppingStateTimeoutInSeconds = PropertiesUtils
+        .getPropAsLong(this.jobPlanningProps, GobblinClusterConfigurationKeys.HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS,
+            GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS);
+
     try {
       HelixUtils.waitJobCompletion(
           GobblinHelixDistributeJobExecutionLauncher.this.planningJobHelixManager,
           workFlowName,
           jobName,
-          timeoutEnabled ? Optional.of(timeoutInSeconds) : Optional.empty());
+          timeoutEnabled ? Optional.of(timeoutInSeconds) : Optional.empty(),
+          stoppingStateTimeoutInSeconds);
       return getResultFromUserContent();
     } catch (TimeoutException te) {
       HelixUtils.handleJobTimeout(workFlowName, jobName,
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 6d41cc7..d37dab1 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -69,6 +69,7 @@ import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.Id;
 import org.apache.gobblin.util.JobLauncherUtils;
 import org.apache.gobblin.util.ParallelRunner;
+import org.apache.gobblin.util.PropertiesUtils;
 import org.apache.gobblin.util.SerializationUtils;
 
 
@@ -429,12 +430,17 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
         GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
         GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS));
 
+    long stoppingStateTimeoutInSeconds = PropertiesUtils
+        .getPropAsLong(this.jobProps, GobblinClusterConfigurationKeys.HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS,
+            GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS);
+
     try {
       HelixUtils.waitJobCompletion(
           this.helixManager,
           this.helixWorkFlowName,
           this.jobContext.getJobId(),
-          timeoutEnabled? Optional.of(timeoutInSeconds) : Optional.empty());
+          timeoutEnabled? Optional.of(timeoutInSeconds) : Optional.empty(),
+          stoppingStateTimeoutInSeconds);
     } catch (TimeoutException te) {
       HelixUtils.handleJobTimeout(helixWorkFlowName, jobContext.getJobId(),
           helixManager, this, this.jobListener);
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index b874ff3..340c9e7 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -156,14 +156,17 @@ public class HelixUtils {
   }
 
   static void waitJobCompletion(HelixManager helixManager, String workFlowName, String jobName,
-      Optional<Long> timeoutInSeconds) throws InterruptedException, TimeoutException {
-
+      Optional<Long> timeoutInSeconds, Long stoppingStateTimeoutInSeconds) throws InterruptedException, TimeoutException {
     log.info("Waiting for job {} to complete...", jobName);
     long endTime = 0;
+    long currentTimeMillis = System.currentTimeMillis();
+
     if (timeoutInSeconds.isPresent()) {
-      endTime = System.currentTimeMillis() + timeoutInSeconds.get() * 1000;
+      endTime = currentTimeMillis + timeoutInSeconds.get() * 1000;
     }
 
+    long stoppingStateEndTime = currentTimeMillis + stoppingStateTimeoutInSeconds * 1000;
+
     while (!timeoutInSeconds.isPresent() || System.currentTimeMillis() <= endTime) {
       WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName);
       if (workflowContext != null) {
@@ -177,6 +180,16 @@ public class HelixUtils {
           case FAILED:
           case COMPLETED:
           return;
+          case STOPPING:
+            log.info("Waiting for job {} to complete... State - {}", jobName, jobState);
+            Thread.sleep(1000);
+            // Workaround for a Helix bug where a job may be stuck in the STOPPING state due to an unresponsive task.
+            if (System.currentTimeMillis() > stoppingStateEndTime) {
+              log.info("Deleting workflow {}", workFlowName);
+              new TaskDriver(helixManager).delete(workFlowName);
+              log.info("Deleted workflow {}", workFlowName);
+            }
+            return;
           default:
             log.info("Waiting for job {} to complete... State - {}", jobName, jobState);
             Thread.sleep(1000);
@@ -257,6 +270,7 @@ public class HelixUtils {
     new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, 10000L);
     log.info("Workflow deleted.");
   }
+
   /**
    * Returns the Helix Workflow Ids given {@link Iterable} of Gobblin job names. The method returns a
    * {@link java.util.Map} from Gobblin job name to the corresponding Helix Workflow Id. This method iterates