You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/03/08 20:22:29 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1071] Retry task initialization

This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b011aae  [GOBBLIN-1071] Retry task initialization
b011aae is described below

commit b011aaeddd976b742ddb04a667d233c9a698c451
Author: autumnust <le...@linkedin.com>
AuthorDate: Sun Mar 8 13:22:17 2020 -0700

    [GOBBLIN-1071] Retry task initialization
    
    Retry task initialization
    
    Removing task-cancel blocking code ( and will
    create in another PR)
    
    Address comments
    
    Fix travis failure
    
    Fix travis failure II
    
    Fix travis failure III
    
    Closes #2909 from autumnust/retryTaskCreation
---
 .../apache/gobblin/cluster/GobblinHelixTask.java   | 52 ++++++++----
 .../apache/gobblin/cluster/GobblinTaskRunner.java  | 26 +++---
 .../gobblin/cluster/InMemorySingleTaskRunner.java  | 26 +++++-
 .../cluster/InMemoryWuFailedSingleTask.java        |  2 +-
 .../gobblin/cluster/InMemoryWuSingleTask.java      |  2 +-
 .../org/apache/gobblin/cluster/SingleTask.java     | 32 +++++---
 .../apache/gobblin/cluster/SingleTaskRunner.java   | 21 +++--
 .../gobblin/cluster/GobblinHelixTaskTest.java      | 96 +++++++++++++++-------
 ...estSingleTaskRerun.java => TestSingleTask.java} | 36 ++++++--
 .../gobblin/runtime/GobblinMultiTaskAttempt.java   | 65 ++++++++++++---
 .../main/java/org/apache/gobblin/runtime/Task.java |  1 -
 .../gobblin/runtime/util/RuntimeConstructs.java    |  4 +
 .../apache/gobblin/TaskErrorIntegrationTest.java   | 76 +++++++++++++----
 .../apache/gobblin/util/retry/RetryerFactory.java  | 19 ++++-
 .../apache/gobblin/yarn/GobblinYarnTaskRunner.java | 10 +--
 15 files changed, 333 insertions(+), 135 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index 122a8d5..21e420d 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -18,7 +18,11 @@
 package org.apache.gobblin.cluster;
 
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.retry.RetryerFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.task.JobContext;
@@ -29,6 +33,7 @@ import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskResult;
 import org.slf4j.MDC;
 
+import com.github.rholder.retry.Retryer;
 import com.google.common.base.Throwables;
 import com.google.common.io.Closer;
 import com.typesafe.config.Config;
@@ -97,28 +102,37 @@ public class GobblinHelixTask implements Task {
                              builder.getAppWorkPath(),
                              this.jobId);
 
-    Config dynamicConfig = builder.getDynamicConfig()
-        .withValue(GobblinClusterConfigurationKeys.TASK_RUNNER_HOST_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getHostName()))
-        .withValue(GobblinClusterConfigurationKeys.CONTAINER_ID_KEY, ConfigValueFactory.fromAnyRef(builder.getContainerId()))
-        .withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getInstanceName()))
-        .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixJobId))
-        .withValue(GobblinClusterConfigurationKeys.HELIX_TASK_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixTaskId));
-
     Integer partitionNum = getPartitionForHelixTask(taskDriver);
-
     if (partitionNum == null) {
       throw new IllegalStateException(String.format("Task %s, job %s on instance %s has no partition assigned",
           this.helixTaskId, builder.getInstanceName(), this.helixJobId));
     }
 
-    dynamicConfig = dynamicConfig.withValue(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY, ConfigValueFactory.fromAnyRef(partitionNum));
-    this.task = new SingleTask(this.jobId,
-                               this.workUnitFilePath,
-                               jobStateFilePath,
-                               builder.getFs(),
-                               taskAttemptBuilder,
-                               stateStores,
-                               dynamicConfig);
+    // Dynamic config is considered as part of JobState in SingleTask
+    // Important to distinguish between dynamicConfig and Config
+    final Config dynamicConfig = builder.getDynamicConfig()
+        .withValue(GobblinClusterConfigurationKeys.TASK_RUNNER_HOST_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getHostName()))
+        .withValue(GobblinClusterConfigurationKeys.CONTAINER_ID_KEY, ConfigValueFactory.fromAnyRef(builder.getContainerId()))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getInstanceName()))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixJobId))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_TASK_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixTaskId))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY, ConfigValueFactory.fromAnyRef(partitionNum));
+
+    Retryer<SingleTask> retryer = RetryerFactory.newInstance(builder.getConfig());
+
+    try {
+      this.task = retryer.call(new Callable<SingleTask>() {
+        @Override
+        public SingleTask call()
+            throws Exception {
+          return new SingleTask(jobId, workUnitFilePath, jobStateFilePath, builder.getFs(), taskAttemptBuilder,
+              stateStores,
+              dynamicConfig);
+        }
+      });
+    } catch (Exception e) {
+      throw new RuntimeException("Execution in creating a SingleTask-with-retry failed", e);
+    }
   }
 
   private void getInfoFromTaskConfig() {
@@ -170,7 +184,9 @@ public class GobblinHelixTask implements Task {
 
   @Override
   public void cancel() {
-    log.warn("Gobblin helix task cancellation invoked.");
-    this.task.cancel();
+    log.info("Gobblin helix task cancellation invoked.");
+    if (this.task != null ) {
+      this.task.cancel();
+    }
   }
 }
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 4504cb5..d71dbd9 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -142,7 +142,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
 
   protected final EventBus eventBus = new EventBus(GobblinTaskRunner.class.getSimpleName());
 
-  protected final Config config;
+  protected final Config clusterConfig;
 
   @Getter
   protected final FileSystem fs;
@@ -171,8 +171,8 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
     Configuration conf = HadoopUtils.newConfiguration();
     this.fs = buildFileSystem(config, conf);
     this.appWorkPath = initAppWorkDir(config, appWorkDirOptional);
-    this.config = saveConfigToFile(config);
-    this.clusterName = this.config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+    this.clusterConfig = saveConfigToFile(config);
+    this.clusterName = this.clusterConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
 
     //Set system properties passed in via application config. As an example, Helix uses System#getProperty() for ZK configuration
     // overrides such as sessionTimeout. In this case, the overrides specified
@@ -183,7 +183,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
 
     this.containerMetrics = buildContainerMetrics();
 
-    String builderStr = ConfigUtils.getString(this.config,
+    String builderStr = ConfigUtils.getString(this.clusterConfig,
         GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER,
         TaskRunnerSuiteBase.Builder.class.getName());
 
@@ -196,7 +196,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
 
     TaskRunnerSuiteBase.Builder builder = GobblinConstructorUtils.<TaskRunnerSuiteBase.Builder>invokeLongestConstructor(
           new ClassAliasResolver(TaskRunnerSuiteBase.Builder.class)
-              .resolveClass(builderStr), this.config);
+              .resolveClass(builderStr), this.clusterConfig);
 
     TaskRunnerSuiteBase suite = builder.setAppWorkPath(this.appWorkPath)
         .setContainerMetrics(this.containerMetrics)
@@ -238,13 +238,13 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
 
   private void initHelixManager() {
     String zkConnectionString =
-        this.config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+        this.clusterConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
     logger.info("Using ZooKeeper connection string: " + zkConnectionString);
 
     if (this.isTaskDriver && this.dedicatedTaskDriverCluster) {
       // This will create a Helix manager to receive the planning job
       this.taskDriverHelixManager = Optional.of(HelixManagerFactory.getZKHelixManager(
-          ConfigUtils.getString(this.config, GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY, ""),
+          ConfigUtils.getString(this.clusterConfig, GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY, ""),
           this.helixInstanceName,
           InstanceType.PARTICIPANT,
           zkConnectionString));
@@ -301,7 +301,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
     // Start metric reporting
     if (this.containerMetrics.isPresent()) {
       this.containerMetrics.get()
-          .startMetricReportingWithFileSuffix(ConfigUtils.configToState(this.config),
+          .startMetricReportingWithFileSuffix(ConfigUtils.configToState(this.clusterConfig),
               this.taskRunnerId);
     }
 
@@ -355,9 +355,9 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
    */
   protected List<Service> getServices() {
     List<Service> serviceList = new ArrayList<>();
-    if (ConfigUtils.getBoolean(this.config, GobblinClusterConfigurationKeys.CONTAINER_HEALTH_METRICS_SERVICE_ENABLED,
+    if (ConfigUtils.getBoolean(this.clusterConfig, GobblinClusterConfigurationKeys.CONTAINER_HEALTH_METRICS_SERVICE_ENABLED,
         GobblinClusterConfigurationKeys.DEFAULT_CONTAINER_HEALTH_METRICS_SERVICE_ENABLED)) {
-      serviceList.add(new ContainerHealthMetricsService(config));
+      serviceList.add(new ContainerHealthMetricsService(clusterConfig));
     }
     return serviceList;
   }
@@ -392,7 +392,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
    * the job with EXAMPLE_INSTANCE_TAG will remain in the ZK until an instance with EXAMPLE_INSTANCE_TAG was found.
    */
   private void addInstanceTags() {
-    List<String> tags = ConfigUtils.getStringList(this.config, GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY);
+    List<String> tags = ConfigUtils.getStringList(this.clusterConfig, GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY);
     HelixManager receiverManager = getReceiverManager();
     if (receiverManager.isConnected()) {
       if (!tags.isEmpty()) {
@@ -449,10 +449,10 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
   }
 
   private Optional<ContainerMetrics> buildContainerMetrics() {
-    Properties properties = ConfigUtils.configToProperties(this.config);
+    Properties properties = ConfigUtils.configToProperties(this.clusterConfig);
     if (GobblinMetrics.isEnabled(properties)) {
       return Optional.of(ContainerMetrics
-          .get(ConfigUtils.configToState(config), this.applicationName, this.taskRunnerId));
+          .get(ConfigUtils.configToState(clusterConfig), this.applicationName, this.taskRunnerId));
     } else {
       return Optional.absent();
     }
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemorySingleTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemorySingleTaskRunner.java
index 7af586e..b411ff8 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemorySingleTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemorySingleTaskRunner.java
@@ -17,25 +17,43 @@
 
 package org.apache.gobblin.cluster;
 
+import java.io.IOException;
+
 import org.apache.gobblin.runtime.util.StateStores;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
 
 /**
  * An taskRunner for in-memory {@link SingleTask} that can be switched to run a meant-to-failed task.
+ * This class is primarily used for testing purpose.
  */
 public class InMemorySingleTaskRunner extends SingleTaskRunner {
+  // Inject configuration by calling set method.
+  private Config injectedConfig = ConfigFactory.empty();
+
   public InMemorySingleTaskRunner(String clusterConfigFilePath, String jobId, String workUnitFilePath) {
     super(clusterConfigFilePath, jobId, workUnitFilePath);
   }
 
   @Override
-  protected SingleTask createTaskAttempt(TaskAttemptBuilder taskAttemptBuilder, FileSystem fs, StateStores stateStores,
-      Path jobStateFilePath, boolean fail) {
+  protected SingleTask createSingleTaskHelper(TaskAttemptBuilder taskAttemptBuilder, FileSystem fs,
+      StateStores stateStores, Path jobStateFilePath, boolean fail)
+      throws IOException {
     return !fail ? new InMemoryWuSingleTask(this.jobId, new Path(this.workUnitFilePath), jobStateFilePath, fs,
-        taskAttemptBuilder, stateStores, GobblinClusterUtils.getDynamicConfig(this.clusterConfig))
+        taskAttemptBuilder, stateStores,
+        GobblinClusterUtils.getDynamicConfig(this.clusterConfig).withFallback(injectedConfig))
         : new InMemoryWuFailedSingleTask(this.jobId, new Path(this.workUnitFilePath), jobStateFilePath, fs,
-            taskAttemptBuilder, stateStores, GobblinClusterUtils.getDynamicConfig(this.clusterConfig));
+            taskAttemptBuilder, stateStores,
+            GobblinClusterUtils.getDynamicConfig(this.clusterConfig).withFallback(injectedConfig));
+  }
+
+  @VisibleForTesting
+  void setInjectedConfig(Config injectedConfig) {
+    this.injectedConfig = injectedConfig;
   }
 }
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
index 75cbf61..43f6944 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
@@ -38,7 +38,7 @@ import com.typesafe.config.Config;
  */
 public class InMemoryWuFailedSingleTask extends SingleTask {
   public InMemoryWuFailedSingleTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, FileSystem fs,
-      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) {
+      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) throws IOException {
     super(jobId, workUnitFilePath, jobStateFilePath, fs, taskAttemptBuilder, stateStores, dynamicConfig);
   }
 
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java
index f60ba9a..2e66cd5 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java
@@ -41,7 +41,7 @@ import com.typesafe.config.Config;
  */
 public class InMemoryWuSingleTask extends SingleTask {
   public InMemoryWuSingleTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, FileSystem fs,
-      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) {
+      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) throws IOException {
     super(jobId, workUnitFilePath, jobStateFilePath, fs, taskAttemptBuilder, stateStores, dynamicConfig);
   }
 
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
index 16a64fa..be0f0de 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
@@ -50,8 +50,10 @@ import org.apache.gobblin.util.SerializationUtils;
 public class SingleTask {
 
   private static final Logger _logger = LoggerFactory.getLogger(SingleTask.class);
+  public static final String MAX_RETRY_WAITING_FOR_INIT_KEY = "maxRetryBlockedOnTaskAttemptInit";
+  public static final int DEFAULT_MAX_RETRY_WAITING_FOR_INIT = 2;
 
-  private GobblinMultiTaskAttempt _taskattempt;
+  private GobblinMultiTaskAttempt _taskAttempt;
   private String _jobId;
   private Path _workUnitFilePath;
   private Path _jobStateFilePath;
@@ -59,9 +61,14 @@ public class SingleTask {
   private TaskAttemptBuilder _taskAttemptBuilder;
   private StateStores _stateStores;
   private Config _dynamicConfig;
+  private JobState _jobState;
 
+  /**
+   * Do all heavy-lifting of initialization in constructor which could be retried if failed,
+   * see the example in {@link GobblinHelixTask}.
+   */
   SingleTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, FileSystem fs,
-      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) {
+      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) throws IOException {
     _jobId = jobId;
     _workUnitFilePath = workUnitFilePath;
     _jobStateFilePath = jobStateFilePath;
@@ -69,30 +76,31 @@ public class SingleTask {
     _taskAttemptBuilder = taskAttemptBuilder;
     _stateStores = stateStores;
     _dynamicConfig = dynamicConfig;
+    _jobState = getJobState();
   }
 
   public void run()
       throws IOException, InterruptedException {
     List<WorkUnit> workUnits = getWorkUnits();
 
-    JobState jobState = getJobState();
     // Add dynamic configuration to the job state
-    _dynamicConfig.entrySet().forEach(e -> jobState.setProp(e.getKey(), e.getValue().unwrapped().toString()));
+    _dynamicConfig.entrySet().forEach(e -> _jobState.setProp(e.getKey(), e.getValue().unwrapped().toString()));
 
-    Config jobConfig = getConfigFromJobState(jobState);
+    Config jobConfig = getConfigFromJobState(_jobState);
 
     _logger.debug("SingleTask.run: jobId {} workUnitFilePath {} jobStateFilePath {} jobState {} jobConfig {}",
-        _jobId, _workUnitFilePath, _jobStateFilePath, jobState, jobConfig);
+        _jobId, _workUnitFilePath, _jobStateFilePath, _jobState, jobConfig);
 
     try (SharedResourcesBroker<GobblinScopeTypes> globalBroker = SharedResourcesBrokerFactory
         .createDefaultTopLevelBroker(jobConfig, GobblinScopeTypes.GLOBAL.defaultScopeInstance())) {
-      SharedResourcesBroker<GobblinScopeTypes> jobBroker = getJobBroker(jobState, globalBroker);
+      SharedResourcesBroker<GobblinScopeTypes> jobBroker = getJobBroker(_jobState, globalBroker);
+
+      _taskAttempt = _taskAttemptBuilder.build(workUnits.iterator(), _jobId, _jobState, jobBroker);
+      _taskAttempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
 
-      _taskattempt = _taskAttemptBuilder.build(workUnits.iterator(), _jobId, jobState, jobBroker);
-      _taskattempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
     } finally {
       _logger.info("Clearing all metrics object in cache.");
-      _taskattempt.cleanMetrics();
+      _taskAttempt.cleanMetrics();
     }
   }
 
@@ -152,10 +160,10 @@ public class SingleTask {
   }
 
   public void cancel() {
-    if (_taskattempt != null) {
+    if (_taskAttempt != null) {
       try {
         _logger.info("Task cancelled: Shutdown starting for tasks with jobId: {}", _jobId);
-        _taskattempt.shutdownTasks();
+        _taskAttempt.shutdownTasks();
         _logger.info("Task cancelled: Shutdown complete for tasks with jobId: {}", _jobId);
       } catch (InterruptedException e) {
         throw new RuntimeException("Interrupted while shutting down task with jobId: " + _jobId, e);
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
index 73d535f..de97fec 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Service;
 import com.google.common.util.concurrent.ServiceManager;
@@ -53,7 +54,8 @@ class SingleTaskRunner {
   protected final String workUnitFilePath;
   protected final Config clusterConfig;
   private final Path appWorkPath;
-  private SingleTask task;
+  @VisibleForTesting
+  SingleTask task;
   private TaskExecutor taskExecutor;
   private GobblinHelixTaskStateTracker taskStateTracker;
   private ServiceManager serviceManager;
@@ -83,9 +85,10 @@ class SingleTaskRunner {
     shutdownServices();
   }
 
-  private void startServices() {
+  @VisibleForTesting
+  void startServices() {
     logger.info("SingleTaskRunner start services.");
-    getServices();
+    initServices();
     this.serviceManager.startAsync();
     try {
       this.serviceManager.awaitHealthy(10, TimeUnit.SECONDS);
@@ -107,11 +110,11 @@ class SingleTaskRunner {
   private void runTask(boolean fail)
       throws IOException, InterruptedException {
     logger.info("SingleTaskRunner running task.");
-    getClusterSingleTask(fail);
+    initClusterSingleTask(fail);
     this.task.run();
   }
 
-  private void getClusterSingleTask(boolean fail)
+  void initClusterSingleTask(boolean fail)
       throws IOException {
     final FileSystem fs = getFileSystem();
     final StateStores stateStores = new StateStores(this.clusterConfig, this.appWorkPath,
@@ -123,11 +126,11 @@ class SingleTaskRunner {
 
     final TaskAttemptBuilder taskAttemptBuilder = getTaskAttemptBuilder(stateStores);
 
-    this.task = createTaskAttempt(taskAttemptBuilder, fs, stateStores, jobStateFilePath, fail);
+    this.task = createSingleTaskHelper(taskAttemptBuilder, fs, stateStores, jobStateFilePath, fail);
   }
 
-  protected SingleTask createTaskAttempt(TaskAttemptBuilder taskAttemptBuilder, FileSystem fs,
-      StateStores stateStores, Path jobStateFilePath, boolean fail) {
+  protected SingleTask createSingleTaskHelper(TaskAttemptBuilder taskAttemptBuilder, FileSystem fs,
+      StateStores stateStores, Path jobStateFilePath, boolean fail) throws IOException {
     return new SingleTask(this.jobId, new Path(this.workUnitFilePath), jobStateFilePath, fs,
         taskAttemptBuilder, stateStores, GobblinClusterUtils.getDynamicConfig(this.clusterConfig));
   }
@@ -140,7 +143,7 @@ class SingleTaskRunner {
     return taskAttemptBuilder;
   }
 
-  private void getServices() {
+  private void initServices() {
     final Properties properties = ConfigUtils.configToProperties(this.clusterConfig);
     this.taskExecutor = new TaskExecutor(properties);
     this.taskStateTracker = new GobblinHelixTaskStateTracker(properties);
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
index 24c597e..ce25fb9 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
@@ -23,6 +23,20 @@ import java.util.Map;
 import java.util.Properties;
 
 import org.apache.avro.Schema;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.example.simplejson.SimpleJsonConverter;
+import org.apache.gobblin.example.simplejson.SimpleJsonSource;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskExecutor;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.Id;
+import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.retry.RetryerFactory;
+import org.apache.gobblin.writer.AvroDataWriterBuilder;
+import org.apache.gobblin.writer.Destination;
+import org.apache.gobblin.writer.WriterOutputFormat;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -44,20 +58,11 @@ import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.example.simplejson.SimpleJsonConverter;
-import org.apache.gobblin.example.simplejson.SimpleJsonSource;
-import org.apache.gobblin.metastore.FsStateStore;
-import org.apache.gobblin.runtime.AbstractJobLauncher;
-import org.apache.gobblin.runtime.JobState;
-import org.apache.gobblin.runtime.TaskExecutor;
-import org.apache.gobblin.source.workunit.WorkUnit;
-import org.apache.gobblin.util.Id;
-import org.apache.gobblin.util.SerializationUtils;
-import org.apache.gobblin.writer.AvroDataWriterBuilder;
-import org.apache.gobblin.writer.Destination;
-import org.apache.gobblin.writer.WriterOutputFormat;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TYPE;
+import static org.mockito.Mockito.when;
 
 
 /**
@@ -96,7 +101,7 @@ public class GobblinHelixTaskTest {
     this.taskExecutor = new TaskExecutor(configuration);
 
     this.helixManager = Mockito.mock(HelixManager.class);
-    Mockito.when(this.helixManager.getInstanceName()).thenReturn(GobblinHelixTaskTest.class.getSimpleName());
+    when(this.helixManager.getInstanceName()).thenReturn(GobblinHelixTaskTest.class.getSimpleName());
     this.taskStateTracker = new GobblinHelixTaskStateTracker(new Properties());
 
     this.localFs = FileSystem.getLocal(configuration);
@@ -143,21 +148,14 @@ public class GobblinHelixTaskTest {
 
     TaskConfig taskConfig = new TaskConfig("", taskConfigMap, true);
     TaskCallbackContext taskCallbackContext = Mockito.mock(TaskCallbackContext.class);
-    Mockito.when(taskCallbackContext.getTaskConfig()).thenReturn(taskConfig);
-    Mockito.when(taskCallbackContext.getManager()).thenReturn(this.helixManager);
-    String helixJobId = Joiner.on("_").join(TestHelper.TEST_JOB_ID, TestHelper.TEST_JOB_ID);
-    JobConfig jobConfig = Mockito.mock(JobConfig.class);
-
-    Mockito.when(jobConfig.getJobId()).thenReturn(helixJobId);
-    Mockito.when(taskCallbackContext.getJobConfig()).thenReturn(jobConfig);
-    JobContext mockJobContext = Mockito.mock(JobContext.class);
-    Map<String, Integer> taskIdPartitionMap = ImmutableMap.of(taskConfig.getId(), 0);
-    Mockito.when(mockJobContext.getTaskIdPartitionMap()).thenReturn(taskIdPartitionMap);
-
-    TaskDriver taskDriver = Mockito.mock(TaskDriver.class);
-    Mockito.when(taskDriver.getJobContext(Mockito.anyString())).thenReturn(mockJobContext);
-
-    TaskRunnerSuiteBase.Builder builder = new TaskRunnerSuiteBase.Builder(ConfigFactory.empty());
+    when(taskCallbackContext.getTaskConfig()).thenReturn(taskConfig);
+    when(taskCallbackContext.getManager()).thenReturn(this.helixManager);
+    TaskDriver taskDriver = createTaskDriverWithMockedAttributes(taskCallbackContext, taskConfig);
+
+    TaskRunnerSuiteBase.Builder builder = new TaskRunnerSuiteBase.Builder(ConfigFactory.empty()
+        .withValue(RETRY_TYPE, ConfigValueFactory.fromAnyRef(RetryerFactory.RetryType.FIXED_ATTEMPT.name()))
+        .withValue(RETRY_TIMES, ConfigValueFactory.fromAnyRef(2))
+    );
     TaskRunnerSuiteBase sb = builder.setInstanceName("TestInstance")
         .setApplicationName("TestApplication")
         .setAppWorkPath(appWorkDir)
@@ -174,8 +172,46 @@ public class GobblinHelixTaskTest {
                                     ConfigFactory.empty(),
                                     Optional.of(taskDriver));
 
+    // Expect to go through.
     this.gobblinHelixTask = (GobblinHelixTask) gobblinHelixTaskFactory.createNewTask(taskCallbackContext);
-    Thread.sleep(1000);
+
+    // Mock the method getFs() which get called in SingleTask constructor, so that SingleTask could fail and trigger retry,
+    // which would also fail eventually with timeout.
+    TaskRunnerSuiteBase.Builder builderSpy = Mockito.spy(builder);
+    when(builderSpy.getFs()).thenThrow(new RuntimeException("failure on purpose"));
+    gobblinHelixTaskFactory =
+        new GobblinHelixTaskFactory(builderSpy,
+            sb.metricContext,
+            this.taskStateTracker,
+            ConfigFactory.empty(),
+            Optional.of(taskDriver));
+    try {
+      gobblinHelixTaskFactory.createNewTask(taskCallbackContext);
+    } catch (Exception e) {
+      Assert.assertTrue(e.getMessage().contains("Execution in creating a SingleTask-with-retry failed"));
+      return;
+    }
+    // Won't reach here.
+    Assert.fail();
+  }
+
+  /**
+   * To test against org.apache.gobblin.cluster.GobblinHelixTask#getPartitionForHelixTask(org.apache.helix.task.TaskDriver)
+   * we need to assign the right partition id for each helix task, which would be queried from taskDriver.
+   * This method encapsulate all mocking steps for taskDriver object to return expected value.
+   */
+  private TaskDriver createTaskDriverWithMockedAttributes(TaskCallbackContext taskCallbackContext,
+      TaskConfig taskConfig) {
+    String helixJobId = Joiner.on("_").join(TestHelper.TEST_JOB_ID, TestHelper.TEST_JOB_ID);
+    JobConfig jobConfig = Mockito.mock(JobConfig.class);
+    when(jobConfig.getJobId()).thenReturn(helixJobId);
+    when(taskCallbackContext.getJobConfig()).thenReturn(jobConfig);
+    JobContext mockJobContext = Mockito.mock(JobContext.class);
+    Map<String, Integer> taskIdPartitionMap = ImmutableMap.of(taskConfig.getId(), 0);
+    when(mockJobContext.getTaskIdPartitionMap()).thenReturn(taskIdPartitionMap);
+    TaskDriver taskDriver = Mockito.mock(TaskDriver.class);
+    when(taskDriver.getJobContext(Mockito.anyString())).thenReturn(mockJobContext);
+    return taskDriver;
   }
 
   @Test(dependsOnMethods = "testPrepareTask")
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTaskRerun.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java
similarity index 68%
rename from gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTaskRerun.java
rename to gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java
index 4fa3d74..2b6499c 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTaskRerun.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java
@@ -17,9 +17,22 @@
 
 package org.apache.gobblin.cluster;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+
+import org.apache.gobblin.testing.AssertWithBackoff;
 import org.junit.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.ConfigFactory;
+
+import javax.annotation.Nullable;
+
+import static org.apache.gobblin.cluster.SingleTask.MAX_RETRY_WAITING_FOR_INIT_KEY;
+
 
 /**
  * Notes & Usage:
@@ -28,7 +41,18 @@ import org.testng.annotations.Test;
  * 2. When needed to reproduce certain errors, replace org.apache.gobblin.cluster.DummySource.DummyExtractor or
  * {@link DummySource} to plug in required logic.
  */
-public class TestSingleTaskRerun {
+public class TestSingleTask {
+
+  private InMemorySingleTaskRunner createInMemoryTaskRunner() {
+    final String clusterConfigPath = "clusterConf";
+    final String wuPath = "_workunits/store/workunit.wu";
+    String clusterConfPath = this.getClass().getClassLoader().getResource(clusterConfigPath).getPath();
+
+    InMemorySingleTaskRunner inMemorySingleTaskRunner = new InMemorySingleTaskRunner(clusterConfPath, "testJob",
+        this.getClass().getClassLoader().getResource(wuPath).getPath());
+
+    return inMemorySingleTaskRunner;
+  }
 
   /**
    * An in-memory {@link SingleTask} runner that could be used to simulate how it works in Gobblin-Cluster.
@@ -36,15 +60,9 @@ public class TestSingleTaskRerun {
    * re-run it again.
    */
   @Test
-  public void testMetricObjectCasting()
+  public void testSingleTaskRerunAfterFailure()
       throws Exception {
-    final String clusterConfigPath = "clusterConf";
-    final String wuPath = "_workunits/store/workunit.wu";
-    String clusterConfPath = this.getClass().getClassLoader().getResource(clusterConfigPath).getPath();
-
-    InMemorySingleTaskRunner inMemorySingleTaskRunner =
-        new InMemorySingleTaskRunner(clusterConfPath, "testJob",
-            this.getClass().getClassLoader().getResource(wuPath).getPath());
+    SingleTaskRunner inMemorySingleTaskRunner = createInMemoryTaskRunner();
     try {
       inMemorySingleTaskRunner.run(true);
     } catch (Exception e) {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index 402a3ff..eded48c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -28,20 +28,9 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 
-import org.apache.gobblin.metrics.GobblinMetrics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-
-import javax.annotation.Nullable;
-import lombok.Setter;
-
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.gobblin_scopes.TaskScopeInstance;
@@ -61,10 +50,30 @@ import org.apache.gobblin.runtime.task.TaskUtils;
 import org.apache.gobblin.runtime.util.JobMetrics;
 import org.apache.gobblin.runtime.util.TaskMetrics;
 import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.Either;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.TaskEventMetadataUtils;
 import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.gobblin.util.retry.RetryerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.Retryer;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import javax.annotation.Nullable;
+import lombok.Setter;
+
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIME_OUT_MS;
 
 
 /**
@@ -393,7 +402,7 @@ public class GobblinMultiTaskAttempt {
       Task task = null;
       try {
         countDownLatch.countUp();
-        task = createTaskRunnable(workUnitState, countDownLatch);
+        task = createTaskWithRetry(workUnitState, countDownLatch);
         this.taskStateTracker.registerNewTask(task);
         task.setTaskFuture(this.taskExecutor.submit(task));
         tasks.add(task);
@@ -456,6 +465,36 @@ public class GobblinMultiTaskAttempt {
     }
   }
 
+  /**
+   * As the initialization of {@link Task} could have unstable external connection which could be healed through
+   * retry, adding retry-wrapper here for the sake of fault-tolerance.
+   */
+  private Task createTaskWithRetry(WorkUnitState workUnitState, CountDownLatch countDownLatch) {
+    Config config = ConfigUtils.propertiesToConfig(this.jobState.getProperties())
+        .withValue(RETRY_TIME_OUT_MS, ConfigValueFactory.fromAnyRef(TimeUnit.MINUTES.toMillis(1L)))
+        .withValue(RETRY_INTERVAL_MS, ConfigValueFactory.fromAnyRef(TimeUnit.SECONDS.toMillis(2L)));
+    Retryer<Task> retryer = RetryerFactory.newInstance(config);
+    // An "effectively final" variable for counting how many retried has been done, mostly for logging purpose.
+    final AtomicInteger counter = new AtomicInteger(0);
+
+    try {
+      return retryer.call(new Callable<Task>() {
+        @Override
+        public Task call()
+            throws Exception {
+          counter.incrementAndGet();
+          log.info(String.format("Task creation attempt %s", counter.get()));
+          return createTaskRunnable(workUnitState, countDownLatch);
+        }
+      });
+    } catch (RetryException re) {
+      log.error(String.format("Fatal Exception creating Task after %s retries", counter));
+      throw Throwables.propagate(re.getLastFailedAttempt().getExceptionCause());
+    } catch (ExecutionException ee) {
+      throw new RuntimeException("Failure in executing retryer due to, ", ee);
+    }
+  }
+
   public void runAndOptionallyCommitTaskAttempt(CommitPolicy multiTaskAttemptCommitPolicy)
       throws IOException, InterruptedException {
     run();
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index ecb7c66..3bc36be 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -237,7 +237,6 @@ public class Task implements TaskIFace {
     this.shutdownLatch = new CountDownLatch(1);
 
     // Setup Streaming constructs
-
     if (isStreamingTask()) {
       Extractor underlyingExtractor = this.taskContext.getRawSourceExtractor();
       if (!(underlyingExtractor instanceof StreamingExtractor)) {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/RuntimeConstructs.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/RuntimeConstructs.java
index 5060bac..f4ec28b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/RuntimeConstructs.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/RuntimeConstructs.java
@@ -20,6 +20,10 @@ package org.apache.gobblin.runtime.util;
 import org.apache.gobblin.runtime.fork.Fork;
 
 
+/**
+ * @deprecated due to non essential usage within this repository.
+ */
+@Deprecated
 public enum RuntimeConstructs {
 
   FORK("Fork", Fork.class);
diff --git a/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
index 9110d20..473719f 100644
--- a/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
+++ b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
@@ -24,14 +24,24 @@ import java.util.Properties;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.instrumented.extractor.InstrumentedExtractor;
 import org.apache.gobblin.publisher.DataPublisher;
 import org.apache.gobblin.publisher.NoopPublisher;
+import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
 import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.TaskContext;
 import org.apache.gobblin.runtime.task.BaseAbstractTask;
 import org.apache.gobblin.runtime.task.TaskFactory;
 import org.apache.gobblin.runtime.task.TaskIFace;
 import org.apache.gobblin.runtime.task.TaskUtils;
+import org.apache.gobblin.source.Source;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.retry.RetryerFactory;
 import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -43,15 +53,8 @@ import org.testng.annotations.AfterTest;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.SourceState;
-import org.apache.gobblin.configuration.WorkUnitState;
-import org.apache.gobblin.instrumented.extractor.InstrumentedExtractor;
-import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
-import org.apache.gobblin.source.Source;
-import org.apache.gobblin.source.extractor.DataRecordException;
-import org.apache.gobblin.source.extractor.Extractor;
-import org.apache.gobblin.source.workunit.WorkUnit;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TYPE;
 
 
 @Test (singleThreaded = true)
@@ -67,7 +70,7 @@ public class TaskErrorIntegrationTest extends BMNGRunner {
 
   /**
    * Test that an extractor that raises an error on creation results in a log message from {@link GobblinMultiTaskAttempt}
-   * and does not hang
+   * and does not hang.
    * @throws Exception
    */
   @Test
@@ -82,6 +85,9 @@ public class TaskErrorIntegrationTest extends BMNGRunner {
 
     jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, BaseTestSource.class.getName());
     jobProperties.setProperty(TestExtractor.RAISE_ERROR, "true");
+    jobProperties.setProperty(RETRY_TYPE, RetryerFactory.RetryType.FIXED_ATTEMPT.name());
+    // Disable retry
+    jobProperties.setProperty(RETRY_TIMES, "1");
 
     GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
 
@@ -92,6 +98,31 @@ public class TaskErrorIntegrationTest extends BMNGRunner {
   }
 
   /**
+   * Test when extractor failure happens as above, retry kicked in and heal the extractor itself.
+   */
+  @Test
+  public void extractorCreationErrorWithRetry() throws Exception {
+    Properties jobProperties =
+        GobblinLocalJobLauncherUtils.getJobProperties("runtime_test/skip_workunits_test.properties");
+
+    jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, BaseTestSource.class.getName());
+    jobProperties.setProperty(TestExtractor.RAISE_ERROR, "true");
+    jobProperties.setProperty(RETRY_TYPE, RetryerFactory.RetryType.FIXED_ATTEMPT.name());
+    jobProperties.setProperty(TestExtractor.ENABLE_RETRY_FLIP, "true");
+    // Enable retry and should work for the second time.
+    jobProperties.setProperty(RETRY_TIMES, "2");
+
+    // Any failure should fail the test.
+    try {
+      GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
+    } catch (Throwable t) {
+      Assert.fail();
+    }
+
+    Assert.assertTrue(true);
+  }
+
+  /**
    * Test that a task submission error results in a log message from {@link GobblinMultiTaskAttempt}
    * and does not hang
    * @throws Exception
@@ -129,26 +160,39 @@ public class TaskErrorIntegrationTest extends BMNGRunner {
     Properties jobProperties =
         GobblinLocalJobLauncherUtils.getJobProperties("runtime_test/skip_workunits_test.properties");
     jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, CustomizedTaskTestSource.class.getName());
+    // To demonstrate failure caught in task creation in test setting, disabled retry in task creation.
+    jobProperties.setProperty(RETRY_TIMES, "1");
+    jobProperties.setProperty(RETRY_TYPE, RetryerFactory.RetryType.FIXED_ATTEMPT.name());
 
     GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
-    Assert.assertTrue(testAppender.events.stream().anyMatch(e -> e.getRenderedMessage()
-        .startsWith("Encountering memory error")));
-
+    Assert.assertTrue(testAppender.events.stream().anyMatch(e -> e.getRenderedMessage().contains("Encountering memory error")));
     logger.removeAppender(testAppender);
   }
 
 
   /**
-   * Test extractor that can be configured to raise an exception on construction
+   * Test extractor that can be configured to raise an exception on construction,
+   * or heal it self after even times of retry (constructor-attempt)
    */
   public static class TestExtractor<S, D> extends InstrumentedExtractor<S, D> {
     private static final String RAISE_ERROR = "raiseError";
+    private static int RETRY_COUNT = 1;
+    private static final String ENABLE_RETRY_FLIP = "enableRetry";
 
     public TestExtractor(WorkUnitState workUnitState) {
       super(workUnitState);
 
-      if (workUnitState.getPropAsBoolean(RAISE_ERROR, false)) {
-        throw new RuntimeException(EXCEPTION_MESSAGE);
+      try {
+        if (workUnitState.getPropAsBoolean(ENABLE_RETRY_FLIP, false) && RETRY_COUNT % 2 == 0) {
+          return;
+        }
+
+        if (workUnitState.getPropAsBoolean(RAISE_ERROR, false)) {
+          throw new RuntimeException(EXCEPTION_MESSAGE);
+        }
+      } finally {
+        // Need to make sure retryCount increment at the end of each constructor.
+        RETRY_COUNT += 1;
       }
     }
 
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
index e793bdd..97b033d 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
@@ -38,7 +38,7 @@ import org.apache.gobblin.exception.NonTransientException;
  * It's recommended to use with ConfigBuilder so that with State and with prefix of the config key,
  * user can easily instantiate Retryer.
  *
- * @see GoogleAnalyticsUnsampledExtractor for some examples.
+ * see GoogleAnalyticsUnsampledExtractor for some examples.
  *
  * @param <T>
  */
@@ -48,6 +48,8 @@ public class RetryerFactory<T> {
   public static final String RETRY_INTERVAL_MS = "interval_ms";
   public static final String RETRY_TIME_OUT_MS = "time_out_ms";
   public static final String RETRY_TYPE = "retry_type";
+  // value large or equal to 1
+  public static final String RETRY_TIMES = "retry_times";
 
   private static final Predicate<Throwable> RETRY_EXCEPTION_PREDICATE;
   private static final Config DEFAULTS;
@@ -64,13 +66,15 @@ public class RetryerFactory<T> {
                                                 .put(RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(30L))
                                                 .put(RETRY_MULTIPLIER, 2L)
                                                 .put(RETRY_TYPE, RetryType.EXPONENTIAL.name())
+                                                .put(RETRY_TIMES, 2)
                                                 .build();
     DEFAULTS = ConfigFactory.parseMap(configMap);
   }
 
   public static enum RetryType {
     EXPONENTIAL,
-    FIXED;
+    FIXED,
+    FIXED_ATTEMPT;
   }
 
   /**
@@ -90,6 +94,8 @@ public class RetryerFactory<T> {
         return newExponentialRetryer(config);
       case FIXED:
         return newFixedRetryer(config);
+      case FIXED_ATTEMPT:
+        return newFixedAttemptBoundRetryer(config);
       default:
         throw new IllegalArgumentException(type + " is not supported");
     }
@@ -104,7 +110,6 @@ public class RetryerFactory<T> {
   }
 
   private static <T> Retryer<T> newExponentialRetryer(Config config) {
-
     return RetryerBuilder.<T> newBuilder()
         .retryIfException(RETRY_EXCEPTION_PREDICATE)
         .withWaitStrategy(WaitStrategies.exponentialWait(config.getLong(RETRY_MULTIPLIER),
@@ -113,4 +118,12 @@ public class RetryerFactory<T> {
         .withStopStrategy(StopStrategies.stopAfterDelay(config.getLong(RETRY_TIME_OUT_MS), TimeUnit.MILLISECONDS))
         .build();
   }
+
+  private static <T> Retryer<T> newFixedAttemptBoundRetryer(Config config) {
+    return RetryerBuilder.<T> newBuilder()
+        .retryIfException(RETRY_EXCEPTION_PREDICATE)
+        .withWaitStrategy(WaitStrategies.fixedWait(config.getLong(RETRY_INTERVAL_MS), TimeUnit.MILLISECONDS))
+        .withStopStrategy(StopStrategies.stopAfterAttempt(config.getInt(RETRY_TIMES)))
+        .build();
+  }
 }
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index 6e8c72f..5b3fa7c 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -70,16 +70,16 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
     services.addAll(super.getServices());
     if (UserGroupInformation.isSecurityEnabled()) {
       LOGGER.info("Adding YarnContainerSecurityManager since security is enabled");
-      services.add(new YarnContainerSecurityManager(this.config, this.fs, this.eventBus));
+      services.add(new YarnContainerSecurityManager(this.clusterConfig, this.fs, this.eventBus));
     }
 
-    if (config.hasPath(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY)) {
+    if (clusterConfig.hasPath(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY)) {
       GobblinYarnLogSource gobblinYarnLogSource = new GobblinYarnLogSource();
-      String containerLogDir = config.getString(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY);
+      String containerLogDir = clusterConfig.getString(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY);
 
       if (gobblinYarnLogSource.isLogSourcePresent()) {
         try {
-            services.add(gobblinYarnLogSource.buildLogCopier(this.config, this.taskRunnerId, this.fs,
+            services.add(gobblinYarnLogSource.buildLogCopier(this.clusterConfig, this.taskRunnerId, this.fs,
                 new Path(containerLogDir, GobblinClusterUtils.getAppWorkDirPath(this.applicationName, this.applicationId))));
         } catch (Exception e) {
           LOGGER.warn("Cannot add LogCopier service to the service manager due to {}", e);
@@ -195,7 +195,7 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
       if (!Strings.isNullOrEmpty(helixInstanceTags)) {
         config = config.withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, ConfigValueFactory.fromAnyRef(helixInstanceTags));
       }
-        
+
       GobblinTaskRunner gobblinTaskRunner =
           new GobblinYarnTaskRunner(applicationName, helixInstanceName, containerId, config,
               Optional.<Path>absent());