You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/07/19 05:14:46 UTC

incubator-gobblin git commit: [GOBBLIN-539] set expiry for helix work flow

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 56be9b230 -> 98fdc504f


[GOBBLIN-539] set expiry for helix work flow

Closes #2402 from arjun4084346/workFlowNameChange


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/98fdc504
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/98fdc504
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/98fdc504

Branch: refs/heads/master
Commit: 98fdc504fd1dd5e14f3f953e0dd928a98d6708df
Parents: 56be9b2
Author: Arjun <ab...@linkedin.com>
Authored: Wed Jul 18 22:15:23 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Jul 18 22:15:23 2018 -0700

----------------------------------------------------------------------
 .../GobblinClusterConfigurationKeys.java        |  4 +--
 ...blinHelixDistributeJobExecutionLauncher.java | 10 +++----
 .../cluster/GobblinHelixJobLauncher.java        | 22 ++++++++++-----
 .../org/apache/gobblin/cluster/HelixUtils.java  | 28 +++++---------------
 4 files changed, 29 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/98fdc504/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index b0badc4..b6c11e3 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -114,8 +114,8 @@ public class GobblinClusterConfigurationKeys {
   public static final String STOP_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "stopTimeoutSeconds";
   public static final long DEFAULT_STOP_TIMEOUT_SECONDS = 60;
 
-  public static final String HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "jobQueueDeleteTimeoutSeconds";
-  public static final long DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS = 300;
+  public static final String HELIX_WORKFLOW_EXPIRY_TIME_SECONDS = GOBBLIN_CLUSTER_PREFIX + "workflow.expirySeconds";
+  public static final long DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS = 6 * 60 * 60;
 
   public static final String TASK_RUNNER_SUITE_BUILDER = GOBBLIN_CLUSTER_PREFIX + "taskRunnerSuite.builder";
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/98fdc504/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index e8681b3..f887f0b 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -99,7 +99,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
 
   protected static final String JOB_PROPS_PREFIX = "gobblin.jobProps.";
 
-  private final long jobQueueDeleteTimeoutSeconds;
+  private final long workFlowExpiryTimeSeconds;
 
   private boolean jobSubmitted;
 
@@ -122,9 +122,9 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
         builder.appWorkDir, PLANNING_WORK_UNIT_DIR_NAME,
         builder.appWorkDir, PLANNING_JOB_STATE_DIR_NAME);
 
-    this.jobQueueDeleteTimeoutSeconds = ConfigUtils.getLong(combined,
-        GobblinClusterConfigurationKeys.HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS,
-        GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS);
+    this.workFlowExpiryTimeSeconds = ConfigUtils.getLong(combined,
+        GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
+        GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS);
   }
 
   @Override
@@ -204,7 +204,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
         jobId,
         taskDriver,
         this.helixManager,
-        this.jobQueueDeleteTimeoutSeconds);
+        this.workFlowExpiryTimeSeconds);
     this.jobSubmitted = true;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/98fdc504/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 60a5405..cd37342 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -125,7 +125,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
   private final ConcurrentHashMap<String, Boolean> runningMap;
   private final StateStores stateStores;
   private final Config jobConfig;
-  private final long jobQueueDeleteTimeoutSeconds;
+  private final long workFlowExpiryTimeSeconds;
 
   public GobblinHelixJobLauncher(Properties jobProps, final HelixManager helixManager, Path appWorkDir,
       List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean> runningMap)
@@ -141,7 +141,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
     this.outputTaskStateDir = new Path(this.appWorkDir, GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME
         + Path.SEPARATOR + this.jobContext.getJobId());
 
-    this.helixWorkFlowName = this.jobContext.getJobName();
+    this.helixWorkFlowName = this.jobContext.getJobId();
     this.jobResourceName = TaskUtil.getNamespacedJobName(this.helixWorkFlowName, this.jobContext.getJobId());
 
     this.jobContext.getJobState().setJobLauncherType(LauncherTypeEnum.CLUSTER);
@@ -151,9 +151,9 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
 
     jobConfig = ConfigUtils.propertiesToConfig(jobProps);
 
-    this.jobQueueDeleteTimeoutSeconds = ConfigUtils.getLong(jobConfig,
-        GobblinClusterConfigurationKeys.HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS,
-        GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS);
+    this.workFlowExpiryTimeSeconds = ConfigUtils.getLong(jobConfig,
+        GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
+        GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS);
 
     Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(jobProps)
         .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, ConfigValueFactory.fromAnyRef(
@@ -215,7 +215,12 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
     if (this.jobSubmitted) {
       try {
         log.info("[DELETE] workflow {}", this.helixWorkFlowName);
-        this.helixTaskDriver.delete(this.helixWorkFlowName);
+        if (this.cancellationRequested) {
+          // TODO : fix this when HELIX-1180 is completed
+          // work flow should never be deleted explicitly because it has a expiry time
+          // If cancellation is requested, we should set the job state to CANCELLED/ABORT
+          this.helixTaskDriver.delete(this.helixWorkFlowName);
+        }
       } catch (IllegalArgumentException e) {
         LOGGER.warn("Failed to cancel job {} in Helix", this.jobContext.getJobId(), e);
       }
@@ -275,6 +280,9 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
       jobConfigBuilder.setRebalanceRunningTask(true);
     }
 
+    jobConfigBuilder.setExpiry(this.jobContext.getJobState().getPropAsLong(
+        GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS, GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS));
+
     return jobConfigBuilder;
   }
 
@@ -283,7 +291,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
    */
   private void submitJobToHelix(JobConfig.Builder jobConfigBuilder) throws Exception {
     HelixUtils.submitJobToWorkFlow(jobConfigBuilder, this.helixWorkFlowName, this.jobContext.getJobId(),
-        this.helixTaskDriver, this.helixManager, this.jobQueueDeleteTimeoutSeconds);
+        this.helixTaskDriver, this.helixManager, this.workFlowExpiryTimeSeconds);
   }
 
   public void launchJob(@Nullable JobListener jobListener)

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/98fdc504/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index e124e81..1a11ee3 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.cluster;
 
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.helix.HelixManager;
@@ -106,29 +107,14 @@ public class HelixUtils {
       String jobName,
       TaskDriver helixTaskDriver,
       HelixManager helixManager,
-      long jobQueueDeleteTimeoutSeconds) throws Exception {
-
-    WorkflowConfig workflowConfig = helixTaskDriver.getWorkflowConfig(helixManager, workFlowName);
-
-    log.info("[DELETE] workflow {} in the beginning", workFlowName);
-    // If the queue is present, but in delete state then wait for cleanup before recreating the queue
-    if (workflowConfig != null && workflowConfig.getTargetState() == TargetState.DELETE) {
-      // We want synchronous delete otherwise state can be deleted after we create it below due to race condition.
-      new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, jobQueueDeleteTimeoutSeconds);
-      // if we get here then the workflow was successfully deleted
-      workflowConfig = null;
-    }
+      long workFlowExpiryTime) throws Exception {
 
+    WorkflowConfig workFlowConfig = new WorkflowConfig.Builder().setExpiry(workFlowExpiryTime, TimeUnit.SECONDS).build();
     // Create a work flow for each job with the name being the queue name
-    if (workflowConfig == null) {
-      // Create a workflow and add the job
-      Workflow workflow = new Workflow.Builder(workFlowName).addJob(jobName, jobConfigBuilder).build();
-      // start the workflow
-      helixTaskDriver.start(workflow);
-      log.info("Created a work flow {}", workFlowName);
-    } else {
-      log.info("Work flow {} already exists", workFlowName);
-    }
+    Workflow workFlow = new Workflow.Builder(workFlowName).setWorkflowConfig(workFlowConfig).addJob(jobName, jobConfigBuilder).build();
+    // start the workflow
+    helixTaskDriver.start(workFlow);
+    log.info("Created a work flow {}", workFlowName);
   }
 
   public static void waitJobCompletion(