You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/04/03 16:29:12 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1107] Lazily initialize Helix TaskStateModelFactory in Gobbli…

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ef461a  [GOBBLIN-1107] Lazily initialize Helix TaskStateModelFactory in Gobbli…
9ef461a is described below

commit 9ef461ad37bb54c023bd383608a3c4024adad06f
Author: sv2000 <su...@gmail.com>
AuthorDate: Fri Apr 3 09:29:02 2020 -0700

    [GOBBLIN-1107] Lazily initialize Helix TaskStateModelFactory in Gobbli…
    
    Closes #2947 from sv2000/taskRunner
---
 .../apache/gobblin/cluster/GobblinTaskRunner.java  | 96 +++++++++++-----------
 .../gobblin/cluster/ClusterIntegrationTest.java    | 12 ++-
 .../cluster/ClusterIntegrationTestUtils.java       | 76 +++++++++++++++++
 .../gobblin/cluster/GobblinTaskRunnerTest.java     | 93 +++++++++++++++++----
 .../cluster/HelixAssignedParticipantCheckTest.java | 37 +++------
 .../cluster/suite/IntegrationBasicSuite.java       |  9 +-
 .../cluster/suite/IntegrationJobCancelSuite.java   | 19 +----
 .../suite/IntegrationJobRestartViaSpecSuite.java   | 14 +---
 8 files changed, 236 insertions(+), 120 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 2d35dea..7ed555e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -120,47 +120,37 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
   public static final String CLUSTER_APP_WORK_DIR = GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_PREFIX + "appWorkDir";
 
   private static final Logger logger = LoggerFactory.getLogger(GobblinTaskRunner.class);
-  static final java.nio.file.Path CLUSTER_CONF_PATH = Paths.get("generated-gobblin-cluster.conf");
 
+  static final java.nio.file.Path CLUSTER_CONF_PATH = Paths.get("generated-gobblin-cluster.conf");
   static final String GOBBLIN_TASK_FACTORY_NAME = "GobblinTaskFactory";
-
   static final String GOBBLIN_JOB_FACTORY_NAME = "GobblinJobFactory";
 
   private final String helixInstanceName;
-
   private final String clusterName;
+  private final Optional<ContainerMetrics> containerMetrics;
+  private final List<Service> services = Lists.newArrayList();
+  private final Path appWorkPath;
 
   @Getter
   private HelixManager jobHelixManager;
-
   private Optional<HelixManager> taskDriverHelixManager = Optional.absent();
-
-  private final ServiceManager serviceManager;
-
-  private final TaskStateModelFactory taskStateModelFactory;
-
-  private final Optional<ContainerMetrics> containerMetrics;
-
-  protected final String taskRunnerId;
-
+  private ServiceManager serviceManager;
+  private TaskStateModelFactory taskStateModelFactory;
+  private boolean isTaskDriver;
+  private boolean dedicatedTaskDriverCluster;
+  private Collection<StandardMetricsBridge.StandardMetrics> metricsCollection;
+  @Getter
+  private volatile boolean started = false;
   private volatile boolean stopInProgress = false;
-
   private volatile boolean isStopped = false;
 
+  protected final String taskRunnerId;
   protected final EventBus eventBus = new EventBus(GobblinTaskRunner.class.getSimpleName());
-
   protected final Config clusterConfig;
-
   @Getter
   protected final FileSystem fs;
-  private final List<Service> services = Lists.newArrayList();
   protected final String applicationName;
   protected final String applicationId;
-  private final Path appWorkPath;
-  private boolean isTaskDriver;
-  private boolean dedicatedTaskDriverCluster;
-
-  private final Collection<StandardMetricsBridge.StandardMetrics> metricsCollection;
 
   public GobblinTaskRunner(String applicationName,
       String helixInstanceName,
@@ -191,6 +181,17 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
 
     this.containerMetrics = buildContainerMetrics();
 
+    logger.info("GobblinTaskRunner({}): applicationName {}, helixInstanceName {}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}",
+        this.isTaskDriver? "taskDriver" : "worker",
+        applicationName,
+        helixInstanceName,
+        applicationId,
+        taskRunnerId,
+        config,
+        appWorkDirOptional);
+  }
+
+  private TaskRunnerSuiteBase initTaskRunnerSuiteBase() throws ReflectiveOperationException {
     String builderStr = ConfigUtils.getString(this.clusterConfig,
         GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER,
         TaskRunnerSuiteBase.Builder.class.getName());
@@ -203,10 +204,10 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
     }
 
     TaskRunnerSuiteBase.Builder builder = GobblinConstructorUtils.<TaskRunnerSuiteBase.Builder>invokeLongestConstructor(
-          new ClassAliasResolver(TaskRunnerSuiteBase.Builder.class)
-              .resolveClass(builderStr), this.clusterConfig);
+        new ClassAliasResolver(TaskRunnerSuiteBase.Builder.class)
+            .resolveClass(builderStr), this.clusterConfig);
 
-    TaskRunnerSuiteBase suite = builder.setAppWorkPath(this.appWorkPath)
+    return builder.setAppWorkPath(this.appWorkPath)
         .setContainerMetrics(this.containerMetrics)
         .setFileSystem(this.fs)
         .setJobHelixManager(this.jobHelixManager)
@@ -216,27 +217,6 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
         .setContainerId(taskRunnerId)
         .setHostName(hostName)
         .build();
-
-    this.taskStateModelFactory = createTaskStateModelFactory(suite.getTaskFactoryMap());
-    this.metricsCollection = suite.getMetricsCollection();
-    this.services.addAll(suite.getServices());
-
-    this.services.addAll(getServices());
-
-    if (this.services.isEmpty()) {
-      this.serviceManager = null;
-    } else {
-      this.serviceManager = new ServiceManager(services);
-    }
-
-    logger.info("GobblinTaskRunner({}): applicationName {}, helixInstanceName {}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}",
-        this.isTaskDriver?"taskDriver" : "worker",
-        applicationName,
-        helixInstanceName,
-        applicationId,
-        taskRunnerId,
-        config,
-        appWorkDirOptional);
   }
 
   private Path initAppWorkDir(Config config, Optional<Path> appWorkDirOptional) {
@@ -304,6 +284,27 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
 
     connectHelixManagerWithRetry();
 
+    TaskRunnerSuiteBase suite;
+    try {
+      suite = initTaskRunnerSuiteBase();
+      synchronized (this) {
+        this.taskStateModelFactory = createTaskStateModelFactory(suite.getTaskFactoryMap());
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    this.metricsCollection = suite.getMetricsCollection();
+    this.services.addAll(suite.getServices());
+
+    this.services.addAll(getServices());
+
+    if (this.services.isEmpty()) {
+      this.serviceManager = null;
+    } else {
+      this.serviceManager = new ServiceManager(services);
+    }
+
     addInstanceTags();
 
     // Start metric reporting
@@ -315,7 +316,10 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
 
     if (this.serviceManager != null) {
       this.serviceManager.startAsync();
+      started = true;
       this.serviceManager.awaitStopped();
+    } else {
+      started = true;
     }
   }
 
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
index e85413f..5d214a2 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
@@ -80,7 +80,9 @@ public class ClusterIntegrationTest {
 
   @Test
   void testJobShouldGetCancelled() throws Exception {
-    this.suite =new IntegrationJobCancelSuite();
+    Config jobConfigOverrides = ClusterIntegrationTestUtils.buildSleepingJob(IntegrationJobCancelSuite.JOB_ID,
+        IntegrationJobCancelSuite.TASK_STATE_FILE);
+    this.suite =new IntegrationJobCancelSuite(jobConfigOverrides);
     HelixManager helixManager = getHelixManager();
     suite.startCluster();
     helixManager.connect();
@@ -121,7 +123,9 @@ public class ClusterIntegrationTest {
    */
   @Test (dependsOnMethods = { "testJobShouldGetCancelled" }, groups = {"disabledOnTravis"})
   public void testJobRestartViaSpec() throws Exception {
-    this.suite = new IntegrationJobRestartViaSpecSuite();
+    Config jobConfigOverrides = ClusterIntegrationTestUtils.buildSleepingJob(IntegrationJobCancelSuite.JOB_ID,
+        IntegrationJobCancelSuite.TASK_STATE_FILE);
+    this.suite = new IntegrationJobRestartViaSpecSuite(jobConfigOverrides);
     HelixManager helixManager = getHelixManager();
 
     IntegrationJobRestartViaSpecSuite restartViaSpecSuite = (IntegrationJobRestartViaSpecSuite) this.suite;
@@ -132,10 +136,10 @@ public class ClusterIntegrationTest {
     helixManager.connect();
 
     AssertWithBackoff.create().timeoutMs(30000).maxSleepMs(1000).backoffFactor(1).
-        assertTrue(isTaskStarted(helixManager, IntegrationJobCancelSuite.JOB_ID), "Waiting for the job to start...");
+        assertTrue(isTaskStarted(helixManager, IntegrationJobRestartViaSpecSuite.JOB_ID), "Waiting for the job to start...");
 
     AssertWithBackoff.create().maxSleepMs(100).timeoutMs(2000).backoffFactor(1).
-        assertTrue(isTaskRunning(IntegrationJobCancelSuite.TASK_STATE_FILE), "Waiting for the task to enter running state");
+        assertTrue(isTaskRunning(IntegrationJobRestartViaSpecSuite.TASK_STATE_FILE), "Waiting for the task to enter running state");
 
     ZkClient zkClient = new ZkClient(this.zkConnectString);
     PathBasedZkSerializer zkSerializer = ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer()).build();
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTestUtils.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTestUtils.java
new file mode 100644
index 0000000..28d030a
--- /dev/null
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTestUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.manager.zk.ZkClient;
+import org.testng.Assert;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+
+public class ClusterIntegrationTestUtils {
+  /**
+   * A utility method to build a job that uses the {@link SleepingCustomTaskSource} with the provided config overrides.
+   * @param jobId
+   * @param taskStateFile
+   * @return job config with overrides
+   */
+  public static Config buildSleepingJob(String jobId, String taskStateFile) {
+    return buildSleepingJob(jobId, taskStateFile, 10L);
+  }
+
+  /**
+   * A utility method to build a job that uses the {@link SleepingCustomTaskSource} with the provided config overrides.
+   * @param jobId
+   * @param taskStateFile
+   * @param helixJobTimeoutSecs
+   * @return job config with overrides
+   */
+  public static Config buildSleepingJob(String jobId, String taskStateFile, Long helixJobTimeoutSecs) {
+    Config jobConfig = ConfigFactory.empty().withValue(SleepingTask.TASK_STATE_FILE_KEY, ConfigValueFactory.fromAnyRef(taskStateFile))
+        .withValue(ConfigurationKeys.JOB_ID_KEY, ConfigValueFactory.fromAnyRef(jobId))
+        .withValue(ConfigurationKeys.SOURCE_CLASS_KEY, ConfigValueFactory.fromAnyRef(SleepingCustomTaskSource.class.getName()))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, ConfigValueFactory.fromAnyRef(Boolean.TRUE))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, ConfigValueFactory.fromAnyRef(helixJobTimeoutSecs));
+    return jobConfig;
+  }
+
+  /**
+   * A utility method that creates a partial instance structure in ZK.
+   */
+  public static void createPartialInstanceStructure(HelixManager helixManager, String zkConnectString) {
+    //Connect and disconnect the helixManager to create a Helix Instance set up.
+    try {
+      helixManager.connect();
+      helixManager.disconnect();
+    } catch (Exception e) {
+      Assert.fail("Failed to connect to ZK");
+    }
+
+    //Delete ERRORS/HISTORY/STATUSUPDATES znodes under INSTANCES to simulate partial instance set up.
+    ZkClient zkClient = new ZkClient(zkConnectString);
+    zkClient.delete(PropertyPathBuilder.instanceError(helixManager.getClusterName(), helixManager.getInstanceName()));
+    zkClient.delete(PropertyPathBuilder.instanceHistory(helixManager.getClusterName(), helixManager.getInstanceName()));
+    zkClient.delete(PropertyPathBuilder.instanceStatusUpdate(helixManager.getClusterName(), helixManager.getInstanceName()));
+  }
+}
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
index 0e48661..98d13df 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
@@ -19,13 +19,16 @@ package org.apache.gobblin.cluster;
 
 import java.io.IOException;
 import java.net.URL;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixException;
-import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -39,6 +42,7 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
+import org.apache.gobblin.cluster.suite.IntegrationBasicSuite;
 import org.apache.gobblin.testing.AssertWithBackoff;
 
 
@@ -56,6 +60,9 @@ import org.apache.gobblin.testing.AssertWithBackoff;
 public class GobblinTaskRunnerTest {
   public final static Logger LOG = LoggerFactory.getLogger(GobblinTaskRunnerTest.class);
 
+  private static final String JOB_ID = "job_taskRunnerTestJob_" + System.currentTimeMillis();
+  private static final String TASK_STATE_FILE = "/tmp/" + GobblinTaskRunnerTest.class.getSimpleName() + "/taskState/_RUNNING";
+
   public static final String HADOOP_OVERRIDE_PROPERTY_NAME = "prop";
 
   private TestingServer testingZKServer;
@@ -66,6 +73,7 @@ public class GobblinTaskRunnerTest {
   private GobblinTaskRunner corruptGobblinTaskRunner;
   private String clusterName;
   private String corruptHelixInstance;
+  private TaskAssignmentAfterConnectionRetry suite;
 
   @BeforeClass
   public void setUp() throws Exception {
@@ -110,7 +118,19 @@ public class GobblinTaskRunnerTest {
 
   @Test
   public void testSendReceiveShutdownMessage() throws Exception {
+    ExecutorService service = Executors.newSingleThreadExecutor();
+    service.submit(() -> GobblinTaskRunnerTest.this.gobblinTaskRunner.start());
+
     Logger log = LoggerFactory.getLogger("testSendReceiveShutdownMessage");
+
+    // Give Helix some time to start the task runner
+    AssertWithBackoff.create().logger(log).timeoutMs(20000)
+        .assertTrue(new Predicate<Void>() {
+          @Override public boolean apply(Void input) {
+            return GobblinTaskRunnerTest.this.gobblinTaskRunner.isStarted();
+          }
+        }, "gobblinTaskRunner started");
+
     this.gobblinClusterManager.sendShutdownRequest();
 
     // Give Helix some time to handle the message
@@ -128,21 +148,13 @@ public class GobblinTaskRunnerTest {
     Assert.assertEquals(fileSystem.getConf().get(HADOOP_OVERRIDE_PROPERTY_NAME), "value");
   }
 
+
   @Test
   public void testConnectHelixManagerWithRetry() {
-    //Connect and disconnect the corrupt task runner to create a Helix Instance set up.
-    try {
-      this.corruptGobblinTaskRunner.connectHelixManager();
-      this.corruptGobblinTaskRunner.disconnectHelixManager();
-    } catch (Exception e) {
-      Assert.fail("Failed to connect to ZK");
-    }
+    HelixManager instanceManager = HelixManagerFactory.getZKHelixManager(
+        clusterName, corruptHelixInstance, InstanceType.PARTICIPANT, testingZKServer.getConnectString());
 
-    //Delete ERRORS/HISTORY/STATUSUPDATES znodes under INSTANCES to simulate partial instance set up.
-    ZkClient zkClient = new ZkClient(testingZKServer.getConnectString());
-    zkClient.delete(PropertyPathBuilder.instanceError(clusterName, corruptHelixInstance));
-    zkClient.delete(PropertyPathBuilder.instanceHistory(clusterName, corruptHelixInstance));
-    zkClient.delete(PropertyPathBuilder.instanceStatusUpdate(clusterName, corruptHelixInstance));
+    ClusterIntegrationTestUtils.createPartialInstanceStructure(instanceManager, testingZKServer.getConnectString());
 
     //Ensure that the connecting to Helix without retry will throw a HelixException
     try {
@@ -158,11 +170,62 @@ public class GobblinTaskRunnerTest {
     Assert.assertTrue(true);
   }
 
+  @Test (groups = {"disabledOnTravis"})
+  public void testTaskAssignmentAfterHelixConnectionRetry()
+      throws Exception {
+    Config jobConfigOverrides = ClusterIntegrationTestUtils.buildSleepingJob(JOB_ID, TASK_STATE_FILE);
+    this.suite = new TaskAssignmentAfterConnectionRetry(jobConfigOverrides);
+
+    String zkConnectString = suite.getManagerConfig().getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+    String clusterName = suite.getManagerConfig().getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+    //A test manager instance for observing the state of the cluster
+    HelixManager helixManager = HelixManagerFactory.getZKHelixManager(clusterName, "TestManager", InstanceType.SPECTATOR, zkConnectString);
+
+    suite.startCluster();
+
+    helixManager.connect();
+
+    //Ensure that Helix has created a workflow
+    AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
+        assertTrue(ClusterIntegrationTest.isTaskStarted(helixManager, JOB_ID), "Waiting for the job to start...");
+
+    //Ensure that the SleepingTask is running
+    AssertWithBackoff.create().maxSleepMs(100).timeoutMs(2000).backoffFactor(1).
+        assertTrue(ClusterIntegrationTest.isTaskRunning(TASK_STATE_FILE),"Waiting for the task to enter running state");
+
+    helixManager.disconnect();
+  }
+
+
+  public static class TaskAssignmentAfterConnectionRetry extends IntegrationBasicSuite {
+    TaskAssignmentAfterConnectionRetry(Config jobConfigOverrides) {
+      super(jobConfigOverrides);
+    }
+
+    @Override
+    protected void createHelixCluster() throws Exception {
+      super.createHelixCluster();
+      String clusterName = super.getManagerConfig().getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+      String zkConnectString = super.getManagerConfig().getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(clusterName, IntegrationBasicSuite.WORKER_INSTANCE_0, InstanceType.PARTICIPANT, zkConnectString);
+
+      //Create a partial instance setup
+      ClusterIntegrationTestUtils.createPartialInstanceStructure(helixManager, zkConnectString);
+    }
+  }
+
+
   @AfterClass
-  public void tearDown() throws IOException {
+  public void tearDown()
+      throws IOException, InterruptedException {
     try {
       this.gobblinClusterManager.disconnectHelixManager();
       this.gobblinTaskRunner.disconnectHelixManager();
+      this.corruptGobblinTaskRunner.disconnectHelixManager();
+      if (this.suite != null) {
+        this.suite.shutdownCluster();
+      }
     } finally {
       this.testingZKServer.close();
     }
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
index eec52b1..1aecd94 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
@@ -17,7 +17,6 @@
 package org.apache.gobblin.cluster;
 
 import java.io.IOException;
-import java.util.Map;
 
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
@@ -27,27 +26,28 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.cluster.suite.IntegrationBasicSuite;
 import org.apache.gobblin.commit.CommitStepException;
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.testing.AssertWithBackoff;
 
 
 public class HelixAssignedParticipantCheckTest {
-  private IntegrationJobSuite suite;
+  private static final String JOB_ID = "job_testJob_345";
+  private static final String TASK_STATE_FILE = "/tmp/" + HelixAssignedParticipantCheckTest.class.getSimpleName() + "/taskState/_RUNNING";
+
+  private IntegrationBasicSuite suite;
   private HelixManager helixManager;
   private Config helixConfig;
 
   @BeforeClass
   public void setUp()
       throws Exception {
-    //Set up a Gobblin Helix cluster
-    suite = new IntegrationJobSuite();
+    Config jobConfigOverrides = ClusterIntegrationTestUtils.buildSleepingJob(JOB_ID, TASK_STATE_FILE);
+    //Set up a Gobblin Helix cluster integration job
+    suite = new IntegrationBasicSuite(jobConfigOverrides);
 
     helixConfig = suite.getManagerConfig();
     String clusterName = helixConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
@@ -66,10 +66,10 @@ public class HelixAssignedParticipantCheckTest {
 
     //Ensure that Helix has created a workflow
     AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
-        assertTrue(ClusterIntegrationTest.isTaskStarted(helixManager, IntegrationJobSuite.JOB_ID), "Waiting for the job to start...");
+        assertTrue(ClusterIntegrationTest.isTaskStarted(helixManager, JOB_ID), "Waiting for the job to start...");
 
     //Instantiate config for HelixAssignedParticipantCheck
-    String helixJobId = Joiner.on("_").join(IntegrationJobSuite.JOB_ID, IntegrationJobSuite.JOB_ID);
+    String helixJobId = Joiner.on("_").join(JOB_ID, JOB_ID);
     helixConfig = helixConfig.withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY,
         ConfigValueFactory.fromAnyRef(IntegrationBasicSuite.WORKER_INSTANCE_0))
         .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY, ConfigValueFactory.fromAnyRef(helixJobId))
@@ -78,7 +78,7 @@ public class HelixAssignedParticipantCheckTest {
 
     //Ensure that the SleepingTask is running
     AssertWithBackoff.create().maxSleepMs(100).timeoutMs(2000).backoffFactor(1).
-        assertTrue(ClusterIntegrationTest.isTaskRunning(IntegrationJobSuite.TASK_STATE_FILE),"Waiting for the task to enter running state");
+        assertTrue(ClusterIntegrationTest.isTaskRunning(TASK_STATE_FILE),"Waiting for the task to enter running state");
 
     //Run the check. Ensure that the configured Helix instance is indeed the assigned participant
     // (i.e. no exceptions thrown).
@@ -104,21 +104,4 @@ public class HelixAssignedParticipantCheckTest {
       helixManager.disconnect();
     }
   }
-
-  public static class IntegrationJobSuite extends IntegrationBasicSuite {
-    public static final String JOB_ID = "job_testJob_345";
-    public static final String TASK_STATE_FILE = "/tmp/" + IntegrationJobSuite.class.getSimpleName() + "/taskState/_RUNNING";
-
-
-    @Override
-    protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) {
-      Config newConfig = ConfigFactory.parseMap(ImmutableMap.of(
-          ConfigurationKeys.SOURCE_CLASS_KEY, "org.apache.gobblin.cluster.SleepingCustomTaskSource",
-          ConfigurationKeys.JOB_ID_KEY, JOB_ID,
-          GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, Boolean.TRUE,
-          GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 10L, SleepingTask.TASK_STATE_FILE_KEY, TASK_STATE_FILE))
-          .withFallback(rawJobConfig);
-      return ImmutableMap.of(JOB_NAME, newConfig);
-    }
-  }
 }
\ No newline at end of file
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
index e6c4587..4f045bb 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
@@ -76,6 +76,8 @@ public class IntegrationBasicSuite {
   public static final String WORKER_INSTANCE_0 = "WorkerInstance_0";
   public static final String TEST_INSTANCE_NAME_KEY = "worker.instance.name";
 
+  protected final Config jobConfigOverrides;
+
   // manager and workers
   protected Config managerConfig;
   protected Collection<Config> taskDriverConfigs = Lists.newArrayList();
@@ -93,6 +95,11 @@ public class IntegrationBasicSuite {
   protected TestingServer testingZKServer;
 
   public IntegrationBasicSuite() {
+    this(ConfigFactory.empty());
+  }
+
+  public IntegrationBasicSuite(Config jobConfigOverrides) {
+    this.jobConfigOverrides = jobConfigOverrides;
     try {
       initWorkDir();
       initJobOutputDir();
@@ -163,7 +170,7 @@ public class IntegrationBasicSuite {
   }
 
   protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) {
-    return ImmutableMap.of(JOB_NAME, rawJobConfig);
+    return ImmutableMap.of(JOB_NAME, this.jobConfigOverrides.withFallback(rawJobConfig));
   }
 
   private void writeJobConf(String jobName, Config jobConfig) throws IOException {
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
index d6c305e..b16febb 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
@@ -17,32 +17,17 @@
 
 package org.apache.gobblin.cluster.suite;
 
-import java.util.Map;
-
 import org.junit.Assert;
 
-import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
-import org.apache.gobblin.cluster.SleepingTask;
-import org.apache.gobblin.configuration.ConfigurationKeys;
 
 
 public class IntegrationJobCancelSuite extends IntegrationBasicSuite {
   public static final String JOB_ID = "job_HelloWorldTestJob_1234";
   public static final String TASK_STATE_FILE = "/tmp/IntegrationJobCancelSuite/taskState/_RUNNING";
 
-  @Override
-  protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) {
-    Config newConfig = ConfigFactory.parseMap(ImmutableMap.of(
-        ConfigurationKeys.SOURCE_CLASS_KEY, "org.apache.gobblin.cluster.SleepingCustomTaskSource",
-        ConfigurationKeys.JOB_ID_KEY, JOB_ID,
-        GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, Boolean.TRUE,
-        GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 10L, SleepingTask.TASK_STATE_FILE_KEY, TASK_STATE_FILE))
-        .withFallback(rawJobConfig);
-    return ImmutableMap.of(JOB_NAME, newConfig);
+  public IntegrationJobCancelSuite(Config jobConfigOverrides) {
+    super(jobConfigOverrides);
   }
 
   @Override
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
index c84c1e7..412603f 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
@@ -27,7 +27,6 @@ import java.net.URISyntaxException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.common.io.Files;
 import com.google.common.io.Resources;
 import com.typesafe.config.Config;
@@ -36,10 +35,10 @@ import com.typesafe.config.ConfigParseOptions;
 import com.typesafe.config.ConfigSyntax;
 import com.typesafe.config.ConfigValueFactory;
 
+import org.apache.gobblin.cluster.ClusterIntegrationTestUtils;
 import org.apache.gobblin.cluster.FsJobConfigurationManager;
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
 import org.apache.gobblin.cluster.SleepingTask;
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FsSpecConsumer;
 import org.apache.gobblin.runtime.api.FsSpecProducer;
 import org.apache.gobblin.runtime.api.JobSpec;
@@ -53,8 +52,8 @@ public class IntegrationJobRestartViaSpecSuite extends IntegrationJobCancelSuite
 
   private final SpecProducer _specProducer;
 
-  public IntegrationJobRestartViaSpecSuite() throws IOException {
-    super();
+  public IntegrationJobRestartViaSpecSuite(Config jobConfigOverrides) throws IOException {
+    super(jobConfigOverrides);
     FileSystem fs = FileSystem.getLocal(new Configuration());
     this._specProducer = new FsSpecProducer(fs, ConfigFactory.empty().withValue(FsSpecConsumer.SPEC_PATH_KEY, ConfigValueFactory.fromAnyRef(FS_SPEC_CONSUMER_DIR)));
   }
@@ -66,12 +65,7 @@ public class IntegrationJobRestartViaSpecSuite extends IntegrationJobCancelSuite
           ConfigFactory.parseReader(reader, ConfigParseOptions.defaults().setSyntax(ConfigSyntax.CONF));
       rawJobConfig = rawJobConfig.withFallback(getClusterConfig());
 
-      Config newConfig = ConfigFactory.parseMap(ImmutableMap
-          .of(ConfigurationKeys.SOURCE_CLASS_KEY, "org.apache.gobblin.cluster.SleepingCustomTaskSource",
-              ConfigurationKeys.JOB_ID_KEY, JOB_ID,
-              GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, Boolean.TRUE,
-              GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 100L,
-              ConfigurationKeys.JOB_NAME_KEY, JOB_NAME));
+      Config newConfig = ClusterIntegrationTestUtils.buildSleepingJob(JOB_ID, TASK_STATE_FILE, 100L);
 
       newConfig = newConfig.withValue(SleepingTask.TASK_STATE_FILE_KEY, ConfigValueFactory.fromAnyRef(TASK_STATE_FILE));
       newConfig = newConfig.withFallback(rawJobConfig);