You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/07/19 05:14:46 UTC
incubator-gobblin git commit: [GOBBLIN-539] set expiry for helix work
flow
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 56be9b230 -> 98fdc504f
[GOBBLIN-539] set expiry for helix work flow
Closes #2402 from arjun4084346/workFlowNameChange
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/98fdc504
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/98fdc504
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/98fdc504
Branch: refs/heads/master
Commit: 98fdc504fd1dd5e14f3f953e0dd928a98d6708df
Parents: 56be9b2
Author: Arjun <ab...@linkedin.com>
Authored: Wed Jul 18 22:15:23 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Jul 18 22:15:23 2018 -0700
----------------------------------------------------------------------
.../GobblinClusterConfigurationKeys.java | 4 +--
...blinHelixDistributeJobExecutionLauncher.java | 10 +++----
.../cluster/GobblinHelixJobLauncher.java | 22 ++++++++++-----
.../org/apache/gobblin/cluster/HelixUtils.java | 28 +++++---------------
4 files changed, 29 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/98fdc504/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index b0badc4..b6c11e3 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -114,8 +114,8 @@ public class GobblinClusterConfigurationKeys {
public static final String STOP_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "stopTimeoutSeconds";
public static final long DEFAULT_STOP_TIMEOUT_SECONDS = 60;
- public static final String HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "jobQueueDeleteTimeoutSeconds";
- public static final long DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS = 300;
+ public static final String HELIX_WORKFLOW_EXPIRY_TIME_SECONDS = GOBBLIN_CLUSTER_PREFIX + "workflow.expirySeconds";
+ public static final long DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS = 6 * 60 * 60;
public static final String TASK_RUNNER_SUITE_BUILDER = GOBBLIN_CLUSTER_PREFIX + "taskRunnerSuite.builder";
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/98fdc504/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index e8681b3..f887f0b 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -99,7 +99,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
protected static final String JOB_PROPS_PREFIX = "gobblin.jobProps.";
- private final long jobQueueDeleteTimeoutSeconds;
+ private final long workFlowExpiryTimeSeconds;
private boolean jobSubmitted;
@@ -122,9 +122,9 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
builder.appWorkDir, PLANNING_WORK_UNIT_DIR_NAME,
builder.appWorkDir, PLANNING_JOB_STATE_DIR_NAME);
- this.jobQueueDeleteTimeoutSeconds = ConfigUtils.getLong(combined,
- GobblinClusterConfigurationKeys.HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS,
- GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS);
+ this.workFlowExpiryTimeSeconds = ConfigUtils.getLong(combined,
+ GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
+ GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS);
}
@Override
@@ -204,7 +204,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
jobId,
taskDriver,
this.helixManager,
- this.jobQueueDeleteTimeoutSeconds);
+ this.workFlowExpiryTimeSeconds);
this.jobSubmitted = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/98fdc504/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 60a5405..cd37342 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -125,7 +125,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
private final ConcurrentHashMap<String, Boolean> runningMap;
private final StateStores stateStores;
private final Config jobConfig;
- private final long jobQueueDeleteTimeoutSeconds;
+ private final long workFlowExpiryTimeSeconds;
public GobblinHelixJobLauncher(Properties jobProps, final HelixManager helixManager, Path appWorkDir,
List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean> runningMap)
@@ -141,7 +141,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
this.outputTaskStateDir = new Path(this.appWorkDir, GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME
+ Path.SEPARATOR + this.jobContext.getJobId());
- this.helixWorkFlowName = this.jobContext.getJobName();
+ this.helixWorkFlowName = this.jobContext.getJobId();
this.jobResourceName = TaskUtil.getNamespacedJobName(this.helixWorkFlowName, this.jobContext.getJobId());
this.jobContext.getJobState().setJobLauncherType(LauncherTypeEnum.CLUSTER);
@@ -151,9 +151,9 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
jobConfig = ConfigUtils.propertiesToConfig(jobProps);
- this.jobQueueDeleteTimeoutSeconds = ConfigUtils.getLong(jobConfig,
- GobblinClusterConfigurationKeys.HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS,
- GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS);
+ this.workFlowExpiryTimeSeconds = ConfigUtils.getLong(jobConfig,
+ GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
+ GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS);
Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(jobProps)
.withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, ConfigValueFactory.fromAnyRef(
@@ -215,7 +215,12 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
if (this.jobSubmitted) {
try {
log.info("[DELETE] workflow {}", this.helixWorkFlowName);
- this.helixTaskDriver.delete(this.helixWorkFlowName);
+ if (this.cancellationRequested) {
+ // TODO : fix this when HELIX-1180 is completed
+ // work flow should never be deleted explicitly because it has a expiry time
+ // If cancellation is requested, we should set the job state to CANCELLED/ABORT
+ this.helixTaskDriver.delete(this.helixWorkFlowName);
+ }
} catch (IllegalArgumentException e) {
LOGGER.warn("Failed to cancel job {} in Helix", this.jobContext.getJobId(), e);
}
@@ -275,6 +280,9 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
jobConfigBuilder.setRebalanceRunningTask(true);
}
+ jobConfigBuilder.setExpiry(this.jobContext.getJobState().getPropAsLong(
+ GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS, GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS));
+
return jobConfigBuilder;
}
@@ -283,7 +291,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
*/
private void submitJobToHelix(JobConfig.Builder jobConfigBuilder) throws Exception {
HelixUtils.submitJobToWorkFlow(jobConfigBuilder, this.helixWorkFlowName, this.jobContext.getJobId(),
- this.helixTaskDriver, this.helixManager, this.jobQueueDeleteTimeoutSeconds);
+ this.helixTaskDriver, this.helixManager, this.workFlowExpiryTimeSeconds);
}
public void launchJob(@Nullable JobListener jobListener)
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/98fdc504/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index e124e81..1a11ee3 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.cluster;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.helix.HelixManager;
@@ -106,29 +107,14 @@ public class HelixUtils {
String jobName,
TaskDriver helixTaskDriver,
HelixManager helixManager,
- long jobQueueDeleteTimeoutSeconds) throws Exception {
-
- WorkflowConfig workflowConfig = helixTaskDriver.getWorkflowConfig(helixManager, workFlowName);
-
- log.info("[DELETE] workflow {} in the beginning", workFlowName);
- // If the queue is present, but in delete state then wait for cleanup before recreating the queue
- if (workflowConfig != null && workflowConfig.getTargetState() == TargetState.DELETE) {
- // We want synchronous delete otherwise state can be deleted after we create it below due to race condition.
- new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, jobQueueDeleteTimeoutSeconds);
- // if we get here then the workflow was successfully deleted
- workflowConfig = null;
- }
+ long workFlowExpiryTime) throws Exception {
+ WorkflowConfig workFlowConfig = new WorkflowConfig.Builder().setExpiry(workFlowExpiryTime, TimeUnit.SECONDS).build();
// Create a work flow for each job with the name being the queue name
- if (workflowConfig == null) {
- // Create a workflow and add the job
- Workflow workflow = new Workflow.Builder(workFlowName).addJob(jobName, jobConfigBuilder).build();
- // start the workflow
- helixTaskDriver.start(workflow);
- log.info("Created a work flow {}", workFlowName);
- } else {
- log.info("Work flow {} already exists", workFlowName);
- }
+ Workflow workFlow = new Workflow.Builder(workFlowName).setWorkflowConfig(workFlowConfig).addJob(jobName, jobConfigBuilder).build();
+ // start the workflow
+ helixTaskDriver.start(workFlow);
+ log.info("Created a work flow {}", workFlowName);
}
public static void waitJobCompletion(