You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/10/24 22:46:11 UTC

[gobblin] branch master updated: give option to cancel helix workflow through Delete API (#3580)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a6dcfc8f give option to cancel helix workflow through Delete API (#3580)
7a6dcfc8f is described below

commit 7a6dcfc8f98229914c75d38d480b6e8af80a1085
Author: Hanghang Nate Liu <na...@gmail.com>
AuthorDate: Mon Oct 24 15:46:03 2022 -0700

    give option to cancel helix workflow through Delete API (#3580)
    
    change log string
    
    change all waitToStop to new cancel method
    
    update imports
    
    address comments
    
    checkstyle
---
 .../cluster/GobblinClusterConfigurationKeys.java   |  6 +++++
 ...GobblinHelixDistributeJobExecutionLauncher.java | 25 +++++++++----------
 .../gobblin/cluster/GobblinHelixJobLauncher.java   | 15 ++++++------
 .../gobblin/cluster/GobblinHelixJobScheduler.java  | 14 ++++++-----
 .../org/apache/gobblin/cluster/HelixUtils.java     | 28 ++++++++++++++++------
 5 files changed, 55 insertions(+), 33 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 4ae21c1d3..d2a60781d 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -175,6 +175,12 @@ public class GobblinClusterConfigurationKeys {
   public static final String CANCEL_RUNNING_JOB_ON_DELETE = GOBBLIN_CLUSTER_PREFIX + "job.cancelRunningJobOnDelete";
   public static final String DEFAULT_CANCEL_RUNNING_JOB_ON_DELETE = "false";
 
+  // By default we cancel job by calling helix stop API. In some cases, jobs just hang in STOPPING state and preventing
+  // new job being launched. We provide this config to give an option to cancel jobs by calling Delete API. Directly delete
+  // a Helix workflow should be safe in Gobblin world, as Gobblin job is stateless for Helix since we implement our own state store
+  public static final String CANCEL_HELIX_JOB_BY_DELETE = GOBBLIN_CLUSTER_PREFIX + "job.cancelHelixJobByDelete";
+  public static final boolean DEFAULT_CANCEL_HELIX_JOB_BY_DELETE = false;
+
   public static final String HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "job.stoppingStateTimeoutSeconds";
   public static final long DEFAULT_HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS = 300;
   public static final String CONTAINER_HEALTH_METRICS_SERVICE_ENABLED = GOBBLIN_CLUSTER_PREFIX + "container.health.metrics.service.enabled" ;
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index 2f0322549..f2ba083c5 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -110,6 +110,8 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
   @Getter
   private DistributeJobMonitor jobMonitor;
 
+  private final Config combinedConfigs;
+
   public GobblinHelixDistributeJobExecutionLauncher(Builder builder) {
     this.planningJobHelixManager = builder.planningJobHelixManager;
 
@@ -118,19 +120,19 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
     this.jobPlanningProps = builder.jobPlanningProps;
     this.jobSubmitted = false;
 
-    Config combined = ConfigUtils.propertiesToConfig(jobPlanningProps)
+    combinedConfigs = ConfigUtils.propertiesToConfig(jobPlanningProps)
         .withFallback(ConfigUtils.propertiesToConfig(sysProps));
 
-    this.workFlowExpiryTimeSeconds = ConfigUtils.getLong(combined,
+    this.workFlowExpiryTimeSeconds = ConfigUtils.getLong(this.combinedConfigs,
         GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
         GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS);
     this.planningJobLauncherMetrics = builder.planningJobLauncherMetrics;
-    this.nonBlockingMode = ConfigUtils.getBoolean(combined,
+    this.nonBlockingMode = ConfigUtils.getBoolean(this.combinedConfigs,
         GobblinClusterConfigurationKeys.NON_BLOCKING_PLANNING_JOB_ENABLED,
         GobblinClusterConfigurationKeys.DEFAULT_NON_BLOCKING_PLANNING_JOB_ENABLED);
     this.helixMetrics = builder.helixMetrics;
     this.jobsMapping = builder.jobsMapping;
-    this.helixJobStopTimeoutSeconds = ConfigUtils.getLong(combined,
+    this.helixJobStopTimeoutSeconds = ConfigUtils.getLong(this.combinedConfigs,
         GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS,
         GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS);
   }
@@ -144,18 +146,17 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
       String planningJobId = getPlanningJobId(this.jobPlanningProps);
       try {
         if (this.cancellationRequested && !this.cancellationExecuted) {
-          // TODO : fix this when HELIX-1180 is completed
-          // work flow should never be deleted explicitly because it has a expiry time
-          // If cancellation is requested, we should set the job state to CANCELLED/ABORT
-          this.helixTaskDriver.waitToStop(planningJobId, this.helixJobStopTimeoutSeconds * 1000);
-          log.info("Stopped the workflow {}", planningJobId);
+          boolean cancelByDelete = ConfigUtils.getBoolean(this.combinedConfigs, GobblinClusterConfigurationKeys.CANCEL_HELIX_JOB_BY_DELETE,
+              GobblinClusterConfigurationKeys.DEFAULT_CANCEL_HELIX_JOB_BY_DELETE);
+          HelixUtils.cancelWorkflow(planningJobId, this.planningJobHelixManager, helixJobStopTimeoutSeconds * 1000, cancelByDelete);
+          log.info("Canceled the workflow {}", planningJobId);
         }
       } catch (HelixException e) {
-        // Cancellation may throw an exception, but Helix set the job state to STOP and it should eventually stop
+        // Cancellation may throw an exception, but Helix set the job state to STOP/DELETE and it should eventually be cleaned up
         // We will keep this.cancellationExecuted and this.cancellationRequested to true and not propagate the exception
-        log.error("Failed to stop workflow {} in Helix", planningJobId, e);
+        log.error("Failed to cancel workflow {} in Helix", planningJobId, e);
       } catch (InterruptedException e) {
-        log.error("Thread interrupted while trying to stop the workflow {} in Helix", planningJobId);
+        log.error("Thread interrupted while trying to cancel the workflow {} in Helix", planningJobId);
         Thread.currentThread().interrupt();
       }
     }
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 933ec64ce..6308ddc2d 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -255,18 +255,17 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
     if (this.jobSubmitted) {
       try {
         if (this.cancellationRequested && !this.cancellationExecuted) {
-          // TODO : fix this when HELIX-1180 is completed
-          // work flow should never be deleted explicitly because it has a expiry time
-          // If cancellation is requested, we should set the job state to CANCELLED/ABORT
-          this.helixTaskDriver.waitToStop(this.helixWorkFlowName, this.helixJobStopTimeoutSeconds * 1000);
-          log.info("stopped the workflow {}", this.helixWorkFlowName);
+          boolean cancelByDelete = ConfigUtils.getBoolean(jobConfig, GobblinClusterConfigurationKeys.CANCEL_HELIX_JOB_BY_DELETE,
+              GobblinClusterConfigurationKeys.DEFAULT_CANCEL_HELIX_JOB_BY_DELETE);
+          HelixUtils.cancelWorkflow(this.helixWorkFlowName, this.helixManager, helixJobStopTimeoutSeconds * 1000, cancelByDelete);
+          log.info("Canceled the workflow {}", this.helixWorkFlowName);
         }
       } catch (RuntimeException e) {
-        // Cancellation may throw an exception, but Helix set the job state to STOP and it should eventually stop
+        // Cancellation may throw an exception, but Helix set the job state to STOP/DELETE and it should eventually be cleaned up
         // We will keep this.cancellationExecuted and this.cancellationRequested to true and not propagate the exception
-        log.error("Failed to stop workflow {} in Helix", helixWorkFlowName, e);
+        log.error("Failed to cancel workflow {} in Helix", helixWorkFlowName, e);
       } catch (InterruptedException e) {
-        log.error("Thread interrupted while trying to stop the workflow {} in Helix", helixWorkFlowName);
+        log.error("Thread interrupted while trying to cancel the workflow {} in Helix", helixWorkFlowName);
         Thread.currentThread().interrupt();
       }
     }
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 7e43c1689..20f47da3e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -34,7 +34,6 @@ import java.util.concurrent.locks.Lock;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
-import org.apache.helix.task.TaskDriver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -379,6 +378,8 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
     Optional<String> distributedJobMode;
     Optional<String> planningJob = Optional.empty();
     Optional<String> actualJob = Optional.empty();
+    boolean cancelByDelete = PropertiesUtils.getPropAsBoolean(this.commonJobProperties, GobblinClusterConfigurationKeys.CANCEL_HELIX_JOB_BY_DELETE,
+        String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_CANCEL_HELIX_JOB_BY_DELETE));
 
     this.jobSchedulerMetrics.numCancellationStart.incrementAndGet();
 
@@ -396,12 +397,12 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
 
     if (planningJob.isPresent()) {
       LOGGER.info("Cancelling planning job helix workflow: {}", planningJob.get());
-      new TaskDriver(this.taskDriverHelixManager.get()).waitToStop(planningJob.get(), this.helixJobStopTimeoutMillis);
+      HelixUtils.cancelWorkflow(planningJob.get(), this.taskDriverHelixManager.get(), this.helixJobStopTimeoutMillis, cancelByDelete);
     }
 
     if (actualJob.isPresent()) {
       LOGGER.info("Cancelling actual job helix workflow: {}", actualJob.get());
-      new TaskDriver(this.jobHelixManager).waitToStop(actualJob.get(), this.helixJobStopTimeoutMillis);
+      HelixUtils.cancelWorkflow(actualJob.get(), this.jobHelixManager, this.helixJobStopTimeoutMillis, cancelByDelete);
     }
 
     this.jobSchedulerMetrics.numCancellationStart.decrementAndGet();
@@ -430,9 +431,10 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
 
       if (jobNameToWorkflowIdMap.containsKey(deleteJobArrival.getJobName())) {
         String workflowId = jobNameToWorkflowIdMap.get(deleteJobArrival.getJobName());
-        TaskDriver taskDriver = new TaskDriver(this.jobHelixManager);
-        taskDriver.waitToStop(workflowId, this.helixJobStopTimeoutMillis);
-        LOGGER.info("Stopped workflow: {}", deleteJobArrival.getJobName());
+        boolean cancelByDelete = PropertiesUtils.getPropAsBoolean(jobConfig, GobblinClusterConfigurationKeys.CANCEL_HELIX_JOB_BY_DELETE,
+            String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_CANCEL_HELIX_JOB_BY_DELETE));
+        HelixUtils.cancelWorkflow(workflowId, this.jobHelixManager, helixJobStopTimeoutMillis, cancelByDelete);
+        LOGGER.info("Cancelled workflow: {}", deleteJobArrival.getJobName());
         //Wait until the cancelled job is complete.
         waitForJobCompletion(deleteJobArrival.getJobName());
       } else {
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 185726c6b..ce217ed20 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -323,6 +323,19 @@ public class HelixUtils {
     }
   }
 
+  // Cancel the job by calling either Delete or Stop Helix API
+  public static void cancelWorkflow(String workflowName, HelixManager helixManager, long timeOut, boolean cancelByDelete)
+      throws InterruptedException {
+    TaskDriver taskDriver = new TaskDriver(helixManager);
+    if (cancelByDelete) {
+      taskDriver.deleteAndWaitForCompletion(workflowName, timeOut);
+      log.info("Canceling Helix workflow: {} through delete API", workflowName);
+    } else {
+      taskDriver.waitToStop(workflowName, timeOut);
+      log.info("Canceling Helix workflow: {} through stop API", workflowName);
+    }
+  }
+
   static void deleteWorkflow (String workflowName, HelixManager helixManager, long timeOut) throws InterruptedException {
     TaskDriver taskDriver = new TaskDriver(helixManager);
     taskDriver.deleteAndWaitForCompletion(workflowName, timeOut);
@@ -340,10 +353,7 @@ public class HelixUtils {
     } catch (JobException e) {
       throw new RuntimeException("Unable to cancel job " + jobName + ": ", e);
     }
-    // TODO : fix this when HELIX-1180 is completed
-    // We should not be deleting a workflow explicitly.
-    // Workflow state should be set to a final state, which will remove it automatically because expiry time is set.
-    // After that, all delete calls can be replaced by something like HelixUtils.setStateToFinal();
+    // Make sure the job is fully cleaned up
     HelixUtils.deleteStoppedHelixJob(helixManager, workFlowName, jobName);
     log.info("Stopped and deleted the workflow {}", workFlowName);
   }
@@ -358,14 +368,18 @@ public class HelixUtils {
    */
   private static void deleteStoppedHelixJob(HelixManager helixManager, String workFlowName, String jobName)
       throws InterruptedException {
+    long deleteTimeout = 10000L;
     WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName);
-    while (workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName)) != STOPPED) {
+    while (workflowContext != null &&
+        workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName)) != STOPPED) {
       log.info("Waiting for job {} to stop...", jobName);
       workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName);
       Thread.sleep(1000);
     }
-    // deleting the entire workflow, as one workflow contains only one job
-    new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, 10000L);
+    if (workflowContext != null) {
+      // deleting the entire workflow, as one workflow contains only one job
+      new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, deleteTimeout);
+    }
     log.info("Workflow deleted.");
   }