You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/07/14 00:36:56 UTC

incubator-gobblin git commit: [GOBBLIN-535] Add second hop for distributed job launcher

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 6232b416a -> 45fc9dfca


[GOBBLIN-535] Add second hop for distributed job launcher

Closes #2398 from yukuai518/hop2


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/45fc9dfc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/45fc9dfc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/45fc9dfc

Branch: refs/heads/master
Commit: 45fc9dfca69dd8479504ee6b9878d196c722a329
Parents: 6232b41
Author: Kuai Yu <ku...@linkedin.com>
Authored: Fri Jul 13 17:36:51 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Fri Jul 13 17:36:51 2018 -0700

----------------------------------------------------------------------
 ...blinHelixDistributeJobExecutionLauncher.java | 20 +++++-
 .../gobblin/cluster/GobblinHelixJobFactory.java | 17 +++--
 .../cluster/GobblinHelixJobLauncher.java        | 10 ++-
 .../gobblin/cluster/GobblinHelixJobTask.java    | 74 ++++++++++++++------
 .../gobblin/cluster/GobblinHelixTask.java       | 13 ++--
 .../gobblin/cluster/GobblinTaskRunner.java      | 12 ++--
 .../cluster/HelixRetriggeringJobCallable.java   | 52 +++++++++++---
 .../org/apache/gobblin/cluster/HelixUtils.java  |  1 +
 .../gobblin/cluster/TaskRunnerSuiteBase.java    | 24 +++++++
 .../cluster/TaskRunnerSuiteThreadModel.java     |  4 +-
 .../TaskRunnerSuiteForJobFactoryTest.java       | 10 +--
 .../suite/IntegrationJobFactorySuite.java       |  3 +-
 .../gobblin/runtime/api/ExecutionResult.java    |  2 +-
 13 files changed, 184 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index 87613a6..7ef24fc 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.cluster;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.URI;
 import java.util.List;
@@ -85,7 +86,7 @@ import org.apache.gobblin.util.PropertiesUtils;
  */
 @Alpha
 @Slf4j
-class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher {
+class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher, Closeable {
   protected HelixManager helixManager;
   protected TaskDriver helixTaskDriver;
   protected Properties sysProperties;
@@ -100,12 +101,14 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
 
   private final long jobQueueDeleteTimeoutSeconds;
 
+  private boolean jobSubmitted;
+
   public GobblinHelixDistributeJobExecutionLauncher(Builder builder) throws Exception {
     this.helixManager = builder.manager;
     this.helixTaskDriver = new TaskDriver(this.helixManager);
     this.sysProperties = builder.sysProperties;
     this.jobProperties = builder.jobProperties;
-
+    this.jobSubmitted = false;
     Config combined = ConfigUtils.propertiesToConfig(jobProperties)
         .withFallback(ConfigUtils.propertiesToConfig(sysProperties));
 
@@ -124,6 +127,17 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
         GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS);
   }
 
+  @Override
+  public void close()
+      throws IOException {
+    // we should delete the planning job at the end.
+    if (this.jobSubmitted) {
+      String planningName = getPlanningJobName(this.jobProperties);
+      log.info("[DELETE] workflow {} in the close.", planningName);
+      this.helixTaskDriver.delete(planningName);
+    }
+  }
+
   @Setter
   public static class Builder {
     Properties sysProperties;
@@ -191,6 +205,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
         taskDriver,
         this.helixManager,
         this.jobQueueDeleteTimeoutSeconds);
+    this.jobSubmitted = true;
   }
 
   @Override
@@ -232,6 +247,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
       return getResultFromUserContent();
     } catch (TimeoutException te) {
       helixTaskDriver.waitToStop(planningName, 10L);
+      log.info("[DELETE] workflow {} timeout.", planningName);
       this.helixTaskDriver.delete(planningName);
       this.helixTaskDriver.resume(planningName);
       log.info("stopped the queue, deleted the job");

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
index 2f7ced2..83821d4 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
@@ -39,11 +39,12 @@ import org.apache.gobblin.util.PathUtils;
  */
 @Slf4j
 public class GobblinHelixJobFactory implements TaskFactory {
-  protected Config sysConfig;
   protected StateStores stateStores;
 
-  public GobblinHelixJobFactory(TaskRunnerSuiteBase.Builder builder) {
-    this.sysConfig = builder.getConfig();
+  protected TaskRunnerSuiteBase.Builder builder;
+
+  private void initializeStateStore(TaskRunnerSuiteBase.Builder builder) {
+    Config sysConfig = builder.getConfig();
     Path appWorkDir = builder.getAppWorkPath();
     URI rootPathUri = PathUtils.getRootPath(appWorkDir).toUri();
     Config stateStoreJobConfig = sysConfig
@@ -56,8 +57,16 @@ public class GobblinHelixJobFactory implements TaskFactory {
         appWorkDir, GobblinHelixDistributeJobExecutionLauncher.PLANNING_JOB_STATE_DIR_NAME);
   }
 
+  public GobblinHelixJobFactory(TaskRunnerSuiteBase.Builder builder) {
+    this.builder = builder;
+    // TODO: We can remove below initialization once Helix allow us to persist job resut in userContentStore
+    initializeStateStore(this.builder);
+  }
+
   @Override
   public Task createNewTask(TaskCallbackContext context) {
-    return new GobblinHelixJobTask(context, this.sysConfig, stateStores);
+    return new GobblinHelixJobTask(context,
+        this.stateStores,
+        this.builder);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 39c6e5b..8794a34 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -45,6 +45,7 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
 
 import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -96,6 +97,7 @@ import org.apache.gobblin.util.SerializationUtils;
  * @author Yinan Li
  */
 @Alpha
+@Slf4j
 public class GobblinHelixJobLauncher extends AbstractJobLauncher {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(GobblinHelixJobLauncher.class);
@@ -136,8 +138,8 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
     this.runningMap = runningMap;
     this.appWorkDir = appWorkDir;
     this.inputWorkUnitDir = new Path(appWorkDir, GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME);
-    this.outputTaskStateDir = new Path(this.appWorkDir, GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME +
-        Path.SEPARATOR + this.jobContext.getJobId());
+    this.outputTaskStateDir = new Path(this.appWorkDir, GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME
+        + Path.SEPARATOR + this.jobContext.getJobId());
 
     this.helixQueueName = this.jobContext.getJobName();
     this.jobResourceName = TaskUtil.getNamespacedJobName(this.helixQueueName, this.jobContext.getJobId());
@@ -149,7 +151,8 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
 
     jobConfig = ConfigUtils.propertiesToConfig(jobProps);
 
-    this.jobQueueDeleteTimeoutSeconds = ConfigUtils.getLong(jobConfig, GobblinClusterConfigurationKeys.HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS,
+    this.jobQueueDeleteTimeoutSeconds = ConfigUtils.getLong(jobConfig,
+        GobblinClusterConfigurationKeys.HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS,
         GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS);
 
     Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(jobProps)
@@ -211,6 +214,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
   protected void executeCancellation() {
     if (this.jobSubmitted) {
       try {
+        log.info("[DELETE] workflow {}", this.helixQueueName);
         this.helixTaskDriver.delete(this.helixQueueName);
       } catch (IllegalArgumentException e) {
         LOGGER.warn("Failed to cancel job {} in Helix", this.jobContext.getJobId(), e);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
index f60852e..9ede090 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
@@ -18,9 +18,14 @@
 package org.apache.gobblin.cluster;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.helix.HelixManager;
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskConfig;
@@ -28,61 +33,85 @@ import org.apache.helix.task.TaskResult;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Closer;
 import com.typesafe.config.Config;
 
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.runtime.util.StateStores;
 import org.apache.gobblin.source.extractor.partition.Partitioner;
 import org.apache.gobblin.util.ConfigUtils;
 
 /**
- * An implementation of Helix's {@link org.apache.helix.task.Task} that runs original {@link GobblinHelixJobLauncher}
+ * An implementation of Helix's {@link org.apache.helix.task.Task} that runs original {@link GobblinHelixJobLauncher}.
  */
 @Slf4j
 public class GobblinHelixJobTask implements Task {
 
   private final TaskConfig taskConfig;
-  private Config sysConfig;
-  private Properties jobConfig;
-  private StateStores stateStores;
-  private String planningJobId;
-
+  private final Config sysConfig;
+  private final Properties jobPlusSysConfig;
+  private final StateStores stateStores;
+  private final String planningJobId;
+  private final HelixManager helixManager;
+  private final Path appWorkDir;
+  private final List<? extends Tag<?>> metadataTags;
+
+  private GobblinHelixJobLauncher launcher;
   public GobblinHelixJobTask(TaskCallbackContext context,
-      Config sysConfig,
-      StateStores stateStores) {
+      StateStores stateStores,
+      TaskRunnerSuiteBase.Builder builder) {
     this.taskConfig = context.getTaskConfig();
-    this.sysConfig = sysConfig;
-    this.jobConfig = ConfigUtils.configToProperties(sysConfig);
+    this.sysConfig = builder.getConfig();
+    this.helixManager = builder.getHelixManager();
+    this.jobPlusSysConfig = ConfigUtils.configToProperties(sysConfig);
     Map<String, String> configMap = this.taskConfig.getConfigMap();
     for (Map.Entry<String, String> entry: configMap.entrySet()) {
       if (entry.getKey().startsWith(GobblinHelixDistributeJobExecutionLauncher.JOB_PROPS_PREFIX)) {
           String key = entry.getKey().replaceFirst(GobblinHelixDistributeJobExecutionLauncher.JOB_PROPS_PREFIX, "");
-          jobConfig.put(key, entry.getValue());
+        jobPlusSysConfig.put(key, entry.getValue());
       }
     }
 
-    if (!jobConfig.containsKey(GobblinClusterConfigurationKeys.PLANNING_ID_KEY)) {
+    if (!jobPlusSysConfig.containsKey(GobblinClusterConfigurationKeys.PLANNING_ID_KEY)) {
       throw new RuntimeException("Job doesn't have plannning ID");
     }
 
-    this.planningJobId = jobConfig.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY);
+    this.planningJobId = jobPlusSysConfig.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY);
     this.stateStores = stateStores;
+    this.appWorkDir = builder.getAppWorkPath();
+    this.metadataTags = Tag.fromMap(new ImmutableMap.Builder<String, Object>()
+        .put(GobblinClusterMetricTagNames.APPLICATION_NAME, builder.getApplicationName())
+        .put(GobblinClusterMetricTagNames.APPLICATION_ID, builder.getApplicationId())
+        .build());
   }
 
-  @Override
-  public TaskResult run() {
-    log.info("We will run planning job " + this.planningJobId);
 
-    // TODO: We should run GobblinHelixJobLauncher#launchJob() here
+  private GobblinHelixJobLauncher createJobLauncher()
+      throws Exception {
+    return new GobblinHelixJobLauncher(jobPlusSysConfig,
+        this.helixManager,
+        this.appWorkDir,
+        this.metadataTags,
+        new ConcurrentHashMap<>());
+  }
 
-    try {
+  @Override
+  public TaskResult run() {
+    log.info("Running planning job {}", this.planningJobId);
+    // Launch the job
+    try (Closer closer = Closer.create()) {
+      this.launcher = createJobLauncher();
+      //TODO: we will provide additional listener
+      closer.register(launcher).launchJob(null);
       setResultToUserContent(ImmutableMap.of(Partitioner.IS_EARLY_STOPPED, "false"));
-    } catch (IOException e) {
-      return new TaskResult(TaskResult.Status.FAILED, "State store cannot be persisted for job " + planningJobId);
+    } catch (Exception e) {
+      return new TaskResult(TaskResult.Status.FAILED, "Exception occurred for job " + planningJobId + ":" + ExceptionUtils
+          .getFullStackTrace(e));
     }
     return new TaskResult(TaskResult.Status.COMPLETED, "");
   }
@@ -102,6 +131,9 @@ public class GobblinHelixJobTask implements Task {
 
   @Override
   public void cancel() {
-    // TODO: We should delete the real job.
+    log.info("Cancelling planning job {}", this.planningJobId);
+    if (launcher != null) {
+      launcher.executeCancellation();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index e651b8e..a2516eb 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -26,13 +26,13 @@ import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
 import com.google.common.base.Throwables;
 import com.google.common.io.Closer;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.TaskState;
@@ -58,14 +58,14 @@ import org.apache.gobblin.util.Id;
  * </p>
  */
 @Alpha
+@Slf4j
 public class GobblinHelixTask implements Task {
 
-  private static final Logger _logger = LoggerFactory.getLogger(GobblinHelixTask.class);
-
   private final TaskConfig taskConfig;
   private String jobName;
   private String jobId;
   private String jobKey;
+  private String taskId;
   private Path workUnitFilePath;
 
   private SingleTask task;
@@ -90,22 +90,25 @@ public class GobblinHelixTask implements Task {
     this.jobName = configMap.get(ConfigurationKeys.JOB_NAME_KEY);
     this.jobId = configMap.get(ConfigurationKeys.JOB_ID_KEY);
     this.jobKey = Long.toString(Id.parse(this.jobId).getSequence());
+    this.taskId = configMap.get(ConfigurationKeys.TASK_ID_KEY);
     this.workUnitFilePath =
         new Path(configMap.get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH));
   }
 
   @Override
   public TaskResult run() {
+    log.info("Actual task {} started.", this.taskId);
     try (Closer closer = Closer.create()) {
       closer.register(MDC.putCloseable(ConfigurationKeys.JOB_NAME_KEY, this.jobName));
       closer.register(MDC.putCloseable(ConfigurationKeys.JOB_KEY_KEY, this.jobKey));
       this.task.run();
+      log.info("Actual task {} finished.", this.taskId);
       return new TaskResult(TaskResult.Status.COMPLETED, "");
     } catch (InterruptedException ie) {
       Thread.currentThread().interrupt();
       return new TaskResult(TaskResult.Status.CANCELED, "");
     } catch (Throwable t) {
-      _logger.error("GobblinHelixTask failed due to " + t.getMessage(), t);
+      log.error("GobblinHelixTask " + taskId + " failed due to " + t.getMessage(), t);
       return new TaskResult(TaskResult.Status.FAILED, Throwables.getStackTraceAsString(t));
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index fb7136d..9d0e6cc 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -89,9 +89,10 @@ import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER
  * {@link org.apache.gobblin.source.workunit.WorkUnit}s.
  *
  * <p>
- *   This class serves as a Helix participant and it uses a {@link HelixManager} to work with Helix.
- *   This class also uses the Helix task execution framework and {@link GobblinHelixTaskFactory} class
- *   for creating {@link GobblinHelixTask}s that Helix manages to run Gobblin data ingestion tasks.
+ *   This class presents a Helix participant and uses a {@link HelixManager} to communicate with Helix.
+ *   It also uses Helix task execution framework and {@link GobblinHelixTaskFactory} class to generate
+ *   {@link GobblinHelixTask}s which handles real Gobblin tasks. All the Helix related task framework is
+ *   encapsulated in {@link TaskRunnerSuiteBase}.
  * </p>
  *
  * <p>
@@ -174,7 +175,10 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
     TaskRunnerSuiteBase suite = builder.setAppWorkPath(this.appWorkPath)
         .setContainerMetrics(this.containerMetrics)
         .setFileSystem(this.fs)
-        .setHelixManager(this.helixManager).build();
+        .setHelixManager(this.helixManager)
+        .setApplicationId(applicationId)
+        .setApplicationName(applicationName)
+        .build();
 
     this.taskStateModelFactory = createTaskStateModelFactory(suite.getTaskFactoryMap());
     this.metrics = suite.getTaskMetrics();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index ce3619b..7b9fd3c 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -23,6 +23,8 @@ import java.util.concurrent.Callable;
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
 
+import com.google.common.io.Closer;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
@@ -38,7 +40,31 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 
 /**
- * A {@link Callable} that runs {@link JobLauncher} multiple times iff re-triggering is enabled and job stops early.
+ * A {@link Callable} that can run a given job multiple times iff:
+ *  1) Re-triggering is enabled and
+ *  2) Job stops early.
+ *
+ * Moreover based on the job properties, a job can be processed immediately (non-distributed) or forwarded to a remote
+ * node (distributed) for handling. Details are illustrated as follows:
+ *
+ * <p>
+ *   If {@link GobblinClusterConfigurationKeys#DISTRIBUTED_JOB_LAUNCHER_ENABLED} is false, the job will be handled
+ *   by {@link HelixRetriggeringJobCallable#launchJobLauncherLoop()}, which simply submits the job to Helix for execution.
+ *
+ *   See {@link GobblinHelixJobLauncher} for job launcher details.
+ * </p>
+ *
+ * <p>
+ *   If {@link GobblinClusterConfigurationKeys#DISTRIBUTED_JOB_LAUNCHER_ENABLED} is true, the job will be handled
+ *   by {@link HelixRetriggeringJobCallable#launchJobExecutionLauncherLoop()}}. It will first create a planning job with
+ *   {@link GobblinTaskRunner#GOBBLIN_JOB_FACTORY_NAME} pre-configured, so that Helix can forward this planning job to
+ *   any nodes that has implemented the Helix task factory model matching the same name. See {@link TaskRunnerSuiteThreadModel}
+ *   implementation of how task factory model is setup.
+ *
+ *   Once the planning job reaches to the remote end, it will be handled by {@link GobblinHelixJobTask} which is
+ *   created by {@link GobblinHelixJobTask}. The actual handling is similar to the non-distributed mode, where
+ *   {@link GobblinHelixJobLauncher} is invoked.
+ * </p>
  */
 @Slf4j
 @Alpha
@@ -123,16 +149,22 @@ class HelixRetriggeringJobCallable implements Callable {
         builder.setManager(this.helixManager);
         builder.setAppWorkDir(this.appWorkDir);
 
-        this.currentJobMonitor = builder.build().launchJob(null);
-        ExecutionResult result = this.currentJobMonitor.get();
-        boolean isEarlyStopped = ((GobblinHelixDistributeJobExecutionLauncher.DistributeJobResult) result).isEarlyStopped();
-        boolean isRetriggerEnabled = this.isRetriggeringEnabled();
-        if (isEarlyStopped && isRetriggerEnabled) {
-          log.info("DistributeJob {} will be re-triggered.", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
-        } else {
-          break;
+        try (Closer closer = Closer.create()) {
+          GobblinHelixDistributeJobExecutionLauncher launcher = builder.build();
+          closer.register(launcher);
+          this.currentJobMonitor = launcher.launchJob(null);
+          ExecutionResult result = this.currentJobMonitor.get();
+          boolean isEarlyStopped = ((GobblinHelixDistributeJobExecutionLauncher.DistributeJobResult) result).isEarlyStopped();
+          boolean isRetriggerEnabled = this.isRetriggeringEnabled();
+          if (isEarlyStopped && isRetriggerEnabled) {
+            log.info("DistributeJob {} will be re-triggered.", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+          } else {
+            break;
+          }
+          currentJobMonitor = null;
+        } catch (Throwable t) {
+          throw new JobException("Failed to launch and run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), t);
         }
-        currentJobMonitor = null;
       }
     } catch (Exception e) {
       log.error("Failed to run job {}", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index ea4e6f7..8b9d5af 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -99,6 +99,7 @@ public class HelixUtils {
 
     WorkflowConfig workflowConfig = helixTaskDriver.getWorkflowConfig(helixManager, queueName);
 
+    log.info("[DELETE] workflow {} in the beginning", queueName);
     // If the queue is present, but in delete state then wait for cleanup before recreating the queue
     if (workflowConfig != null && workflowConfig.getTargetState() == TargetState.DELETE) {
       new TaskDriver(helixManager).deleteAndWaitForCompletion(queueName, jobQueueDeleteTimeoutSeconds);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
index 03d4d42..e65b968 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
@@ -53,11 +53,15 @@ public abstract class TaskRunnerSuiteBase {
   protected TaskFactory taskFactory;
   protected TaskFactory jobFactory;
   protected MetricContext metricContext;
+  protected String applicationId;
+  protected String applicationName;
   protected StandardMetricsBridge.StandardMetrics taskMetrics;
   protected List<Service> services = Lists.newArrayList();
 
   protected TaskRunnerSuiteBase(Builder builder) {
     this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(builder.config), this.getClass());
+    this.applicationId = builder.getApplicationId();
+    this.applicationName = builder.getApplicationName();
   }
 
   protected MetricContext getMetricContext() {
@@ -70,6 +74,14 @@ public abstract class TaskRunnerSuiteBase {
 
   protected abstract List<Service> getServices();
 
+  protected String getApplicationId() {
+    return this.applicationId;
+  }
+
+  protected String getApplicationName() {
+    return this.applicationName;
+  }
+
   @Getter
   public static class Builder {
     private Config config;
@@ -77,6 +89,8 @@ public abstract class TaskRunnerSuiteBase {
     private Optional<ContainerMetrics> containerMetrics;
     private FileSystem fs;
     private Path appWorkPath;
+    private String applicationId;
+    private String applicationName;
 
     public Builder(Config config) {
       this.config = config;
@@ -87,6 +101,16 @@ public abstract class TaskRunnerSuiteBase {
       return this;
     }
 
+    public Builder setApplicationName(String applicationName) {
+      this.applicationName = applicationName;
+      return this;
+    }
+
+    public Builder setApplicationId(String applicationId) {
+      this.applicationId = applicationId;
+      return this;
+    }
+
     public Builder setContainerMetrics(Optional<ContainerMetrics> containerMetrics) {
       this.containerMetrics = containerMetrics;
       return this;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
index 4f3a1e0..cddf519 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
@@ -47,7 +47,7 @@ class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase {
   TaskRunnerSuiteThreadModel(TaskRunnerSuiteBase.Builder builder) {
     super(builder);
     this.taskExecutor = new TaskExecutor(ConfigUtils.configToProperties(builder.getConfig()));
-    this.taskFactory = getInProcessTaskFactory(taskExecutor, builder);
+    this.taskFactory = generateTaskFactory(taskExecutor, builder);
     this.jobFactory = new GobblinHelixJobFactory(builder);
     this.taskMetrics = new GobblinTaskRunnerMetrics.InProcessTaskRunnerMetrics(taskExecutor, metricContext);
   }
@@ -70,7 +70,7 @@ class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase {
     return this.services;
   }
 
-  private TaskFactory getInProcessTaskFactory(TaskExecutor taskExecutor, Builder builder) {
+  private TaskFactory generateTaskFactory(TaskExecutor taskExecutor, Builder builder) {
     Properties properties = ConfigUtils.configToProperties(builder.getConfig());
     URI rootPathUri = PathUtils.getRootPath(builder.getAppWorkPath()).toUri();
     Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(properties)

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
index 1fdcda5..476747d 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
@@ -27,7 +27,6 @@ import org.apache.helix.task.TaskFactory;
 import org.testng.Assert;
 
 import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -56,19 +55,20 @@ public class TaskRunnerSuiteForJobFactoryTest extends TaskRunnerSuiteThreadModel
   public class TestJobFactory extends GobblinHelixJobFactory {
     public TestJobFactory(IntegrationJobFactorySuite.TestJobFactorySuiteBuilder builder) {
       super (builder);
+      this.builder = builder;
     }
 
     @Override
     public Task createNewTask(TaskCallbackContext context) {
-      return new TestHelixJobTask(context, this.sysConfig, stateStores);
+      return new TestHelixJobTask(context, stateStores, builder);
     }
   }
 
   public class TestHelixJobTask extends GobblinHelixJobTask {
     public TestHelixJobTask(TaskCallbackContext context,
-        Config sysConfig,
-        StateStores stateStores) {
-      super(context, sysConfig, stateStores);
+        StateStores stateStores,
+        TaskRunnerSuiteBase.Builder builder) {
+      super(context, stateStores, builder);
     }
 
     //TODO: change below to Helix UserConentStore

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java
index 0487c9c..5166395 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java
@@ -43,7 +43,8 @@ public class IntegrationJobFactorySuite extends IntegrationBasicSuite {
   protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) {
     Config newConfig = ConfigFactory.parseMap(ImmutableMap.of(
         GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_ENABLED, true,
-        GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER, "TestDistributedExecutionLauncherBuilder"));
+        GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER, "TestDistributedExecutionLauncherBuilder"))
+        .withFallback(rawJobConfig);
     return ImmutableMap.of("HelloWorldJob", newConfig);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/ExecutionResult.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/ExecutionResult.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/ExecutionResult.java
index 3f77493..3b990f9 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/ExecutionResult.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/ExecutionResult.java
@@ -18,7 +18,7 @@
 package org.apache.gobblin.runtime.api;
 
 /**
- * An object which describes the result after job completion. This can be retrieved by {@link JobExecutionFuture#get()}
+ * An object which describes the result after job completion. This can be retrieved by {@link JobExecutionMonitor#get()}
  *
  * @see JobExecutionResult as a derived class.
  */