You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/10/24 22:46:11 UTC
[gobblin] branch master updated: give option to cancel helix workflow through Delete API (#3580)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7a6dcfc8f give option to cancel helix workflow through Delete API (#3580)
7a6dcfc8f is described below
commit 7a6dcfc8f98229914c75d38d480b6e8af80a1085
Author: Hanghang Nate Liu <na...@gmail.com>
AuthorDate: Mon Oct 24 15:46:03 2022 -0700
give option to cancel helix workflow through Delete API (#3580)
change log string
change all waitToStop to new cancel method
update imports
address comments
checkstyle
---
.../cluster/GobblinClusterConfigurationKeys.java | 6 +++++
...GobblinHelixDistributeJobExecutionLauncher.java | 25 +++++++++----------
.../gobblin/cluster/GobblinHelixJobLauncher.java | 15 ++++++------
.../gobblin/cluster/GobblinHelixJobScheduler.java | 14 ++++++-----
.../org/apache/gobblin/cluster/HelixUtils.java | 28 ++++++++++++++++------
5 files changed, 55 insertions(+), 33 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 4ae21c1d3..d2a60781d 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -175,6 +175,12 @@ public class GobblinClusterConfigurationKeys {
public static final String CANCEL_RUNNING_JOB_ON_DELETE = GOBBLIN_CLUSTER_PREFIX + "job.cancelRunningJobOnDelete";
public static final String DEFAULT_CANCEL_RUNNING_JOB_ON_DELETE = "false";
+ // By default we cancel job by calling helix stop API. In some cases, jobs just hang in STOPPING state and preventing
+ // new job being launched. We provide this config to give an option to cancel jobs by calling Delete API. Directly delete
+ // a Helix workflow should be safe in Gobblin world, as Gobblin job is stateless for Helix since we implement our own state store
+ public static final String CANCEL_HELIX_JOB_BY_DELETE = GOBBLIN_CLUSTER_PREFIX + "job.cancelHelixJobByDelete";
+ public static final boolean DEFAULT_CANCEL_HELIX_JOB_BY_DELETE = false;
+
public static final String HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "job.stoppingStateTimeoutSeconds";
public static final long DEFAULT_HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS = 300;
public static final String CONTAINER_HEALTH_METRICS_SERVICE_ENABLED = GOBBLIN_CLUSTER_PREFIX + "container.health.metrics.service.enabled" ;
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index 2f0322549..f2ba083c5 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -110,6 +110,8 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
@Getter
private DistributeJobMonitor jobMonitor;
+ private final Config combinedConfigs;
+
public GobblinHelixDistributeJobExecutionLauncher(Builder builder) {
this.planningJobHelixManager = builder.planningJobHelixManager;
@@ -118,19 +120,19 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
this.jobPlanningProps = builder.jobPlanningProps;
this.jobSubmitted = false;
- Config combined = ConfigUtils.propertiesToConfig(jobPlanningProps)
+ combinedConfigs = ConfigUtils.propertiesToConfig(jobPlanningProps)
.withFallback(ConfigUtils.propertiesToConfig(sysProps));
- this.workFlowExpiryTimeSeconds = ConfigUtils.getLong(combined,
+ this.workFlowExpiryTimeSeconds = ConfigUtils.getLong(this.combinedConfigs,
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS);
this.planningJobLauncherMetrics = builder.planningJobLauncherMetrics;
- this.nonBlockingMode = ConfigUtils.getBoolean(combined,
+ this.nonBlockingMode = ConfigUtils.getBoolean(this.combinedConfigs,
GobblinClusterConfigurationKeys.NON_BLOCKING_PLANNING_JOB_ENABLED,
GobblinClusterConfigurationKeys.DEFAULT_NON_BLOCKING_PLANNING_JOB_ENABLED);
this.helixMetrics = builder.helixMetrics;
this.jobsMapping = builder.jobsMapping;
- this.helixJobStopTimeoutSeconds = ConfigUtils.getLong(combined,
+ this.helixJobStopTimeoutSeconds = ConfigUtils.getLong(this.combinedConfigs,
GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS);
}
@@ -144,18 +146,17 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
String planningJobId = getPlanningJobId(this.jobPlanningProps);
try {
if (this.cancellationRequested && !this.cancellationExecuted) {
- // TODO : fix this when HELIX-1180 is completed
- // work flow should never be deleted explicitly because it has a expiry time
- // If cancellation is requested, we should set the job state to CANCELLED/ABORT
- this.helixTaskDriver.waitToStop(planningJobId, this.helixJobStopTimeoutSeconds * 1000);
- log.info("Stopped the workflow {}", planningJobId);
+ boolean cancelByDelete = ConfigUtils.getBoolean(this.combinedConfigs, GobblinClusterConfigurationKeys.CANCEL_HELIX_JOB_BY_DELETE,
+ GobblinClusterConfigurationKeys.DEFAULT_CANCEL_HELIX_JOB_BY_DELETE);
+ HelixUtils.cancelWorkflow(planningJobId, this.planningJobHelixManager, helixJobStopTimeoutSeconds * 1000, cancelByDelete);
+ log.info("Canceled the workflow {}", planningJobId);
}
} catch (HelixException e) {
- // Cancellation may throw an exception, but Helix set the job state to STOP and it should eventually stop
+ // Cancellation may throw an exception, but Helix set the job state to STOP/DELETE and it should eventually be cleaned up
// We will keep this.cancellationExecuted and this.cancellationRequested to true and not propagate the exception
- log.error("Failed to stop workflow {} in Helix", planningJobId, e);
+ log.error("Failed to cancel workflow {} in Helix", planningJobId, e);
} catch (InterruptedException e) {
- log.error("Thread interrupted while trying to stop the workflow {} in Helix", planningJobId);
+ log.error("Thread interrupted while trying to cancel the workflow {} in Helix", planningJobId);
Thread.currentThread().interrupt();
}
}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 933ec64ce..6308ddc2d 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -255,18 +255,17 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
if (this.jobSubmitted) {
try {
if (this.cancellationRequested && !this.cancellationExecuted) {
- // TODO : fix this when HELIX-1180 is completed
- // work flow should never be deleted explicitly because it has a expiry time
- // If cancellation is requested, we should set the job state to CANCELLED/ABORT
- this.helixTaskDriver.waitToStop(this.helixWorkFlowName, this.helixJobStopTimeoutSeconds * 1000);
- log.info("stopped the workflow {}", this.helixWorkFlowName);
+ boolean cancelByDelete = ConfigUtils.getBoolean(jobConfig, GobblinClusterConfigurationKeys.CANCEL_HELIX_JOB_BY_DELETE,
+ GobblinClusterConfigurationKeys.DEFAULT_CANCEL_HELIX_JOB_BY_DELETE);
+ HelixUtils.cancelWorkflow(this.helixWorkFlowName, this.helixManager, helixJobStopTimeoutSeconds * 1000, cancelByDelete);
+ log.info("Canceled the workflow {}", this.helixWorkFlowName);
}
} catch (RuntimeException e) {
- // Cancellation may throw an exception, but Helix set the job state to STOP and it should eventually stop
+ // Cancellation may throw an exception, but Helix set the job state to STOP/DELETE and it should eventually be cleaned up
// We will keep this.cancellationExecuted and this.cancellationRequested to true and not propagate the exception
- log.error("Failed to stop workflow {} in Helix", helixWorkFlowName, e);
+ log.error("Failed to cancel workflow {} in Helix", helixWorkFlowName, e);
} catch (InterruptedException e) {
- log.error("Thread interrupted while trying to stop the workflow {} in Helix", helixWorkFlowName);
+ log.error("Thread interrupted while trying to cancel the workflow {} in Helix", helixWorkFlowName);
Thread.currentThread().interrupt();
}
}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 7e43c1689..20f47da3e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -34,7 +34,6 @@ import java.util.concurrent.locks.Lock;
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixManager;
-import org.apache.helix.task.TaskDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -379,6 +378,8 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
Optional<String> distributedJobMode;
Optional<String> planningJob = Optional.empty();
Optional<String> actualJob = Optional.empty();
+ boolean cancelByDelete = PropertiesUtils.getPropAsBoolean(this.commonJobProperties, GobblinClusterConfigurationKeys.CANCEL_HELIX_JOB_BY_DELETE,
+ String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_CANCEL_HELIX_JOB_BY_DELETE));
this.jobSchedulerMetrics.numCancellationStart.incrementAndGet();
@@ -396,12 +397,12 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
if (planningJob.isPresent()) {
LOGGER.info("Cancelling planning job helix workflow: {}", planningJob.get());
- new TaskDriver(this.taskDriverHelixManager.get()).waitToStop(planningJob.get(), this.helixJobStopTimeoutMillis);
+ HelixUtils.cancelWorkflow(planningJob.get(), this.taskDriverHelixManager.get(), this.helixJobStopTimeoutMillis, cancelByDelete);
}
if (actualJob.isPresent()) {
LOGGER.info("Cancelling actual job helix workflow: {}", actualJob.get());
- new TaskDriver(this.jobHelixManager).waitToStop(actualJob.get(), this.helixJobStopTimeoutMillis);
+ HelixUtils.cancelWorkflow(actualJob.get(), this.jobHelixManager, this.helixJobStopTimeoutMillis, cancelByDelete);
}
this.jobSchedulerMetrics.numCancellationStart.decrementAndGet();
@@ -430,9 +431,10 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
if (jobNameToWorkflowIdMap.containsKey(deleteJobArrival.getJobName())) {
String workflowId = jobNameToWorkflowIdMap.get(deleteJobArrival.getJobName());
- TaskDriver taskDriver = new TaskDriver(this.jobHelixManager);
- taskDriver.waitToStop(workflowId, this.helixJobStopTimeoutMillis);
- LOGGER.info("Stopped workflow: {}", deleteJobArrival.getJobName());
+ boolean cancelByDelete = PropertiesUtils.getPropAsBoolean(jobConfig, GobblinClusterConfigurationKeys.CANCEL_HELIX_JOB_BY_DELETE,
+ String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_CANCEL_HELIX_JOB_BY_DELETE));
+ HelixUtils.cancelWorkflow(workflowId, this.jobHelixManager, helixJobStopTimeoutMillis, cancelByDelete);
+ LOGGER.info("Cancelled workflow: {}", deleteJobArrival.getJobName());
//Wait until the cancelled job is complete.
waitForJobCompletion(deleteJobArrival.getJobName());
} else {
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 185726c6b..ce217ed20 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -323,6 +323,19 @@ public class HelixUtils {
}
}
+ // Cancel the job by calling either Delete or Stop Helix API
+ public static void cancelWorkflow(String workflowName, HelixManager helixManager, long timeOut, boolean cancelByDelete)
+ throws InterruptedException {
+ TaskDriver taskDriver = new TaskDriver(helixManager);
+ if (cancelByDelete) {
+ taskDriver.deleteAndWaitForCompletion(workflowName, timeOut);
+ log.info("Canceling Helix workflow: {} through delete API", workflowName);
+ } else {
+ taskDriver.waitToStop(workflowName, timeOut);
+ log.info("Canceling Helix workflow: {} through stop API", workflowName);
+ }
+ }
+
static void deleteWorkflow (String workflowName, HelixManager helixManager, long timeOut) throws InterruptedException {
TaskDriver taskDriver = new TaskDriver(helixManager);
taskDriver.deleteAndWaitForCompletion(workflowName, timeOut);
@@ -340,10 +353,7 @@ public class HelixUtils {
} catch (JobException e) {
throw new RuntimeException("Unable to cancel job " + jobName + ": ", e);
}
- // TODO : fix this when HELIX-1180 is completed
- // We should not be deleting a workflow explicitly.
- // Workflow state should be set to a final state, which will remove it automatically because expiry time is set.
- // After that, all delete calls can be replaced by something like HelixUtils.setStateToFinal();
+ // Make sure the job is fully cleaned up
HelixUtils.deleteStoppedHelixJob(helixManager, workFlowName, jobName);
log.info("Stopped and deleted the workflow {}", workFlowName);
}
@@ -358,14 +368,18 @@ public class HelixUtils {
*/
private static void deleteStoppedHelixJob(HelixManager helixManager, String workFlowName, String jobName)
throws InterruptedException {
+ long deleteTimeout = 10000L;
WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName);
- while (workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName)) != STOPPED) {
+ while (workflowContext != null &&
+ workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName)) != STOPPED) {
log.info("Waiting for job {} to stop...", jobName);
workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName);
Thread.sleep(1000);
}
- // deleting the entire workflow, as one workflow contains only one job
- new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, 10000L);
+ if (workflowContext != null) {
+ // deleting the entire workflow, as one workflow contains only one job
+ new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, deleteTimeout);
+ }
log.info("Workflow deleted.");
}