You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/04/03 16:29:12 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1107] Lazily initialize Helix TaskStateModelFactory in Gobbli…
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9ef461a [GOBBLIN-1107] Lazily initialize Helix TaskStateModelFactory in Gobbli…
9ef461a is described below
commit 9ef461ad37bb54c023bd383608a3c4024adad06f
Author: sv2000 <su...@gmail.com>
AuthorDate: Fri Apr 3 09:29:02 2020 -0700
[GOBBLIN-1107] Lazily initialize Helix TaskStateModelFactory in Gobbli…
Closes #2947 from sv2000/taskRunner
---
.../apache/gobblin/cluster/GobblinTaskRunner.java | 96 +++++++++++-----------
.../gobblin/cluster/ClusterIntegrationTest.java | 12 ++-
.../cluster/ClusterIntegrationTestUtils.java | 76 +++++++++++++++++
.../gobblin/cluster/GobblinTaskRunnerTest.java | 93 +++++++++++++++++----
.../cluster/HelixAssignedParticipantCheckTest.java | 37 +++------
.../cluster/suite/IntegrationBasicSuite.java | 9 +-
.../cluster/suite/IntegrationJobCancelSuite.java | 19 +----
.../suite/IntegrationJobRestartViaSpecSuite.java | 14 +---
8 files changed, 236 insertions(+), 120 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 2d35dea..7ed555e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -120,47 +120,37 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
public static final String CLUSTER_APP_WORK_DIR = GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_PREFIX + "appWorkDir";
private static final Logger logger = LoggerFactory.getLogger(GobblinTaskRunner.class);
- static final java.nio.file.Path CLUSTER_CONF_PATH = Paths.get("generated-gobblin-cluster.conf");
+ static final java.nio.file.Path CLUSTER_CONF_PATH = Paths.get("generated-gobblin-cluster.conf");
static final String GOBBLIN_TASK_FACTORY_NAME = "GobblinTaskFactory";
-
static final String GOBBLIN_JOB_FACTORY_NAME = "GobblinJobFactory";
private final String helixInstanceName;
-
private final String clusterName;
+ private final Optional<ContainerMetrics> containerMetrics;
+ private final List<Service> services = Lists.newArrayList();
+ private final Path appWorkPath;
@Getter
private HelixManager jobHelixManager;
-
private Optional<HelixManager> taskDriverHelixManager = Optional.absent();
-
- private final ServiceManager serviceManager;
-
- private final TaskStateModelFactory taskStateModelFactory;
-
- private final Optional<ContainerMetrics> containerMetrics;
-
- protected final String taskRunnerId;
-
+ private ServiceManager serviceManager;
+ private TaskStateModelFactory taskStateModelFactory;
+ private boolean isTaskDriver;
+ private boolean dedicatedTaskDriverCluster;
+ private Collection<StandardMetricsBridge.StandardMetrics> metricsCollection;
+ @Getter
+ private volatile boolean started = false;
private volatile boolean stopInProgress = false;
-
private volatile boolean isStopped = false;
+ protected final String taskRunnerId;
protected final EventBus eventBus = new EventBus(GobblinTaskRunner.class.getSimpleName());
-
protected final Config clusterConfig;
-
@Getter
protected final FileSystem fs;
- private final List<Service> services = Lists.newArrayList();
protected final String applicationName;
protected final String applicationId;
- private final Path appWorkPath;
- private boolean isTaskDriver;
- private boolean dedicatedTaskDriverCluster;
-
- private final Collection<StandardMetricsBridge.StandardMetrics> metricsCollection;
public GobblinTaskRunner(String applicationName,
String helixInstanceName,
@@ -191,6 +181,17 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
this.containerMetrics = buildContainerMetrics();
+ logger.info("GobblinTaskRunner({}): applicationName {}, helixInstanceName {}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}",
+ this.isTaskDriver? "taskDriver" : "worker",
+ applicationName,
+ helixInstanceName,
+ applicationId,
+ taskRunnerId,
+ config,
+ appWorkDirOptional);
+ }
+
+ private TaskRunnerSuiteBase initTaskRunnerSuiteBase() throws ReflectiveOperationException {
String builderStr = ConfigUtils.getString(this.clusterConfig,
GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER,
TaskRunnerSuiteBase.Builder.class.getName());
@@ -203,10 +204,10 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
}
TaskRunnerSuiteBase.Builder builder = GobblinConstructorUtils.<TaskRunnerSuiteBase.Builder>invokeLongestConstructor(
- new ClassAliasResolver(TaskRunnerSuiteBase.Builder.class)
- .resolveClass(builderStr), this.clusterConfig);
+ new ClassAliasResolver(TaskRunnerSuiteBase.Builder.class)
+ .resolveClass(builderStr), this.clusterConfig);
- TaskRunnerSuiteBase suite = builder.setAppWorkPath(this.appWorkPath)
+ return builder.setAppWorkPath(this.appWorkPath)
.setContainerMetrics(this.containerMetrics)
.setFileSystem(this.fs)
.setJobHelixManager(this.jobHelixManager)
@@ -216,27 +217,6 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
.setContainerId(taskRunnerId)
.setHostName(hostName)
.build();
-
- this.taskStateModelFactory = createTaskStateModelFactory(suite.getTaskFactoryMap());
- this.metricsCollection = suite.getMetricsCollection();
- this.services.addAll(suite.getServices());
-
- this.services.addAll(getServices());
-
- if (this.services.isEmpty()) {
- this.serviceManager = null;
- } else {
- this.serviceManager = new ServiceManager(services);
- }
-
- logger.info("GobblinTaskRunner({}): applicationName {}, helixInstanceName {}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}",
- this.isTaskDriver?"taskDriver" : "worker",
- applicationName,
- helixInstanceName,
- applicationId,
- taskRunnerId,
- config,
- appWorkDirOptional);
}
private Path initAppWorkDir(Config config, Optional<Path> appWorkDirOptional) {
@@ -304,6 +284,27 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
connectHelixManagerWithRetry();
+ TaskRunnerSuiteBase suite;
+ try {
+ suite = initTaskRunnerSuiteBase();
+ synchronized (this) {
+ this.taskStateModelFactory = createTaskStateModelFactory(suite.getTaskFactoryMap());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ this.metricsCollection = suite.getMetricsCollection();
+ this.services.addAll(suite.getServices());
+
+ this.services.addAll(getServices());
+
+ if (this.services.isEmpty()) {
+ this.serviceManager = null;
+ } else {
+ this.serviceManager = new ServiceManager(services);
+ }
+
addInstanceTags();
// Start metric reporting
@@ -315,7 +316,10 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
if (this.serviceManager != null) {
this.serviceManager.startAsync();
+ started = true;
this.serviceManager.awaitStopped();
+ } else {
+ started = true;
}
}
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
index e85413f..5d214a2 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
@@ -80,7 +80,9 @@ public class ClusterIntegrationTest {
@Test
void testJobShouldGetCancelled() throws Exception {
- this.suite =new IntegrationJobCancelSuite();
+ Config jobConfigOverrides = ClusterIntegrationTestUtils.buildSleepingJob(IntegrationJobCancelSuite.JOB_ID,
+ IntegrationJobCancelSuite.TASK_STATE_FILE);
+ this.suite =new IntegrationJobCancelSuite(jobConfigOverrides);
HelixManager helixManager = getHelixManager();
suite.startCluster();
helixManager.connect();
@@ -121,7 +123,9 @@ public class ClusterIntegrationTest {
*/
@Test (dependsOnMethods = { "testJobShouldGetCancelled" }, groups = {"disabledOnTravis"})
public void testJobRestartViaSpec() throws Exception {
- this.suite = new IntegrationJobRestartViaSpecSuite();
+ Config jobConfigOverrides = ClusterIntegrationTestUtils.buildSleepingJob(IntegrationJobCancelSuite.JOB_ID,
+ IntegrationJobCancelSuite.TASK_STATE_FILE);
+ this.suite = new IntegrationJobRestartViaSpecSuite(jobConfigOverrides);
HelixManager helixManager = getHelixManager();
IntegrationJobRestartViaSpecSuite restartViaSpecSuite = (IntegrationJobRestartViaSpecSuite) this.suite;
@@ -132,10 +136,10 @@ public class ClusterIntegrationTest {
helixManager.connect();
AssertWithBackoff.create().timeoutMs(30000).maxSleepMs(1000).backoffFactor(1).
- assertTrue(isTaskStarted(helixManager, IntegrationJobCancelSuite.JOB_ID), "Waiting for the job to start...");
+ assertTrue(isTaskStarted(helixManager, IntegrationJobRestartViaSpecSuite.JOB_ID), "Waiting for the job to start...");
AssertWithBackoff.create().maxSleepMs(100).timeoutMs(2000).backoffFactor(1).
- assertTrue(isTaskRunning(IntegrationJobCancelSuite.TASK_STATE_FILE), "Waiting for the task to enter running state");
+ assertTrue(isTaskRunning(IntegrationJobRestartViaSpecSuite.TASK_STATE_FILE), "Waiting for the task to enter running state");
ZkClient zkClient = new ZkClient(this.zkConnectString);
PathBasedZkSerializer zkSerializer = ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer()).build();
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTestUtils.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTestUtils.java
new file mode 100644
index 0000000..28d030a
--- /dev/null
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTestUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.manager.zk.ZkClient;
+import org.testng.Assert;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+
+public class ClusterIntegrationTestUtils {
+ /**
+ * A utility method to build a job that uses the {@link SleepingCustomTaskSource} with the provided config overrides.
+ * @param jobId
+ * @param taskStateFile
+ * @return job config with overrides
+ */
+ public static Config buildSleepingJob(String jobId, String taskStateFile) {
+ return buildSleepingJob(jobId, taskStateFile, 10L);
+ }
+
+ /**
+ * A utility method to build a job that uses the {@link SleepingCustomTaskSource} with the provided config overrides.
+ * @param jobId
+ * @param taskStateFile
+ * @param helixJobTimeoutSecs
+ * @return job config with overrides
+ */
+ public static Config buildSleepingJob(String jobId, String taskStateFile, Long helixJobTimeoutSecs) {
+ Config jobConfig = ConfigFactory.empty().withValue(SleepingTask.TASK_STATE_FILE_KEY, ConfigValueFactory.fromAnyRef(taskStateFile))
+ .withValue(ConfigurationKeys.JOB_ID_KEY, ConfigValueFactory.fromAnyRef(jobId))
+ .withValue(ConfigurationKeys.SOURCE_CLASS_KEY, ConfigValueFactory.fromAnyRef(SleepingCustomTaskSource.class.getName()))
+ .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, ConfigValueFactory.fromAnyRef(Boolean.TRUE))
+ .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, ConfigValueFactory.fromAnyRef(helixJobTimeoutSecs));
+ return jobConfig;
+ }
+
+ /**
+ * A utility method that creates a partial instance structure in ZK.
+ */
+ public static void createPartialInstanceStructure(HelixManager helixManager, String zkConnectString) {
+ //Connect and disconnect the helixManager to create a Helix Instance set up.
+ try {
+ helixManager.connect();
+ helixManager.disconnect();
+ } catch (Exception e) {
+ Assert.fail("Failed to connect to ZK");
+ }
+
+ //Delete ERRORS/HISTORY/STATUSUPDATES znodes under INSTANCES to simulate partial instance set up.
+ ZkClient zkClient = new ZkClient(zkConnectString);
+ zkClient.delete(PropertyPathBuilder.instanceError(helixManager.getClusterName(), helixManager.getInstanceName()));
+ zkClient.delete(PropertyPathBuilder.instanceHistory(helixManager.getClusterName(), helixManager.getInstanceName()));
+ zkClient.delete(PropertyPathBuilder.instanceStatusUpdate(helixManager.getClusterName(), helixManager.getInstanceName()));
+ }
+}
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
index 0e48661..98d13df 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
@@ -19,13 +19,16 @@ package org.apache.gobblin.cluster;
import java.io.IOException;
import java.net.URL;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixException;
-import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -39,6 +42,7 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
+import org.apache.gobblin.cluster.suite.IntegrationBasicSuite;
import org.apache.gobblin.testing.AssertWithBackoff;
@@ -56,6 +60,9 @@ import org.apache.gobblin.testing.AssertWithBackoff;
public class GobblinTaskRunnerTest {
public final static Logger LOG = LoggerFactory.getLogger(GobblinTaskRunnerTest.class);
+ private static final String JOB_ID = "job_taskRunnerTestJob_" + System.currentTimeMillis();
+ private static final String TASK_STATE_FILE = "/tmp/" + GobblinTaskRunnerTest.class.getSimpleName() + "/taskState/_RUNNING";
+
public static final String HADOOP_OVERRIDE_PROPERTY_NAME = "prop";
private TestingServer testingZKServer;
@@ -66,6 +73,7 @@ public class GobblinTaskRunnerTest {
private GobblinTaskRunner corruptGobblinTaskRunner;
private String clusterName;
private String corruptHelixInstance;
+ private TaskAssignmentAfterConnectionRetry suite;
@BeforeClass
public void setUp() throws Exception {
@@ -110,7 +118,19 @@ public class GobblinTaskRunnerTest {
@Test
public void testSendReceiveShutdownMessage() throws Exception {
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ service.submit(() -> GobblinTaskRunnerTest.this.gobblinTaskRunner.start());
+
Logger log = LoggerFactory.getLogger("testSendReceiveShutdownMessage");
+
+ // Give Helix some time to start the task runner
+ AssertWithBackoff.create().logger(log).timeoutMs(20000)
+ .assertTrue(new Predicate<Void>() {
+ @Override public boolean apply(Void input) {
+ return GobblinTaskRunnerTest.this.gobblinTaskRunner.isStarted();
+ }
+ }, "gobblinTaskRunner started");
+
this.gobblinClusterManager.sendShutdownRequest();
// Give Helix some time to handle the message
@@ -128,21 +148,13 @@ public class GobblinTaskRunnerTest {
Assert.assertEquals(fileSystem.getConf().get(HADOOP_OVERRIDE_PROPERTY_NAME), "value");
}
+
@Test
public void testConnectHelixManagerWithRetry() {
- //Connect and disconnect the corrupt task runner to create a Helix Instance set up.
- try {
- this.corruptGobblinTaskRunner.connectHelixManager();
- this.corruptGobblinTaskRunner.disconnectHelixManager();
- } catch (Exception e) {
- Assert.fail("Failed to connect to ZK");
- }
+ HelixManager instanceManager = HelixManagerFactory.getZKHelixManager(
+ clusterName, corruptHelixInstance, InstanceType.PARTICIPANT, testingZKServer.getConnectString());
- //Delete ERRORS/HISTORY/STATUSUPDATES znodes under INSTANCES to simulate partial instance set up.
- ZkClient zkClient = new ZkClient(testingZKServer.getConnectString());
- zkClient.delete(PropertyPathBuilder.instanceError(clusterName, corruptHelixInstance));
- zkClient.delete(PropertyPathBuilder.instanceHistory(clusterName, corruptHelixInstance));
- zkClient.delete(PropertyPathBuilder.instanceStatusUpdate(clusterName, corruptHelixInstance));
+ ClusterIntegrationTestUtils.createPartialInstanceStructure(instanceManager, testingZKServer.getConnectString());
//Ensure that the connecting to Helix without retry will throw a HelixException
try {
@@ -158,11 +170,62 @@ public class GobblinTaskRunnerTest {
Assert.assertTrue(true);
}
+ @Test (groups = {"disabledOnTravis"})
+ public void testTaskAssignmentAfterHelixConnectionRetry()
+ throws Exception {
+ Config jobConfigOverrides = ClusterIntegrationTestUtils.buildSleepingJob(JOB_ID, TASK_STATE_FILE);
+ this.suite = new TaskAssignmentAfterConnectionRetry(jobConfigOverrides);
+
+ String zkConnectString = suite.getManagerConfig().getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+ String clusterName = suite.getManagerConfig().getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+ //A test manager instance for observing the state of the cluster
+ HelixManager helixManager = HelixManagerFactory.getZKHelixManager(clusterName, "TestManager", InstanceType.SPECTATOR, zkConnectString);
+
+ suite.startCluster();
+
+ helixManager.connect();
+
+ //Ensure that Helix has created a workflow
+ AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
+ assertTrue(ClusterIntegrationTest.isTaskStarted(helixManager, JOB_ID), "Waiting for the job to start...");
+
+ //Ensure that the SleepingTask is running
+ AssertWithBackoff.create().maxSleepMs(100).timeoutMs(2000).backoffFactor(1).
+ assertTrue(ClusterIntegrationTest.isTaskRunning(TASK_STATE_FILE),"Waiting for the task to enter running state");
+
+ helixManager.disconnect();
+ }
+
+
+ public static class TaskAssignmentAfterConnectionRetry extends IntegrationBasicSuite {
+ TaskAssignmentAfterConnectionRetry(Config jobConfigOverrides) {
+ super(jobConfigOverrides);
+ }
+
+ @Override
+ protected void createHelixCluster() throws Exception {
+ super.createHelixCluster();
+ String clusterName = super.getManagerConfig().getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+ String zkConnectString = super.getManagerConfig().getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(clusterName, IntegrationBasicSuite.WORKER_INSTANCE_0, InstanceType.PARTICIPANT, zkConnectString);
+
+ //Create a partial instance setup
+ ClusterIntegrationTestUtils.createPartialInstanceStructure(helixManager, zkConnectString);
+ }
+ }
+
+
@AfterClass
- public void tearDown() throws IOException {
+ public void tearDown()
+ throws IOException, InterruptedException {
try {
this.gobblinClusterManager.disconnectHelixManager();
this.gobblinTaskRunner.disconnectHelixManager();
+ this.corruptGobblinTaskRunner.disconnectHelixManager();
+ if (this.suite != null) {
+ this.suite.shutdownCluster();
+ }
} finally {
this.testingZKServer.close();
}
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
index eec52b1..1aecd94 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
@@ -17,7 +17,6 @@
package org.apache.gobblin.cluster;
import java.io.IOException;
-import java.util.Map;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
@@ -27,27 +26,28 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.cluster.suite.IntegrationBasicSuite;
import org.apache.gobblin.commit.CommitStepException;
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.testing.AssertWithBackoff;
public class HelixAssignedParticipantCheckTest {
- private IntegrationJobSuite suite;
+ private static final String JOB_ID = "job_testJob_345";
+ private static final String TASK_STATE_FILE = "/tmp/" + HelixAssignedParticipantCheckTest.class.getSimpleName() + "/taskState/_RUNNING";
+
+ private IntegrationBasicSuite suite;
private HelixManager helixManager;
private Config helixConfig;
@BeforeClass
public void setUp()
throws Exception {
- //Set up a Gobblin Helix cluster
- suite = new IntegrationJobSuite();
+ Config jobConfigOverrides = ClusterIntegrationTestUtils.buildSleepingJob(JOB_ID, TASK_STATE_FILE);
+ //Set up a Gobblin Helix cluster integration job
+ suite = new IntegrationBasicSuite(jobConfigOverrides);
helixConfig = suite.getManagerConfig();
String clusterName = helixConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
@@ -66,10 +66,10 @@ public class HelixAssignedParticipantCheckTest {
//Ensure that Helix has created a workflow
AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
- assertTrue(ClusterIntegrationTest.isTaskStarted(helixManager, IntegrationJobSuite.JOB_ID), "Waiting for the job to start...");
+ assertTrue(ClusterIntegrationTest.isTaskStarted(helixManager, JOB_ID), "Waiting for the job to start...");
//Instantiate config for HelixAssignedParticipantCheck
- String helixJobId = Joiner.on("_").join(IntegrationJobSuite.JOB_ID, IntegrationJobSuite.JOB_ID);
+ String helixJobId = Joiner.on("_").join(JOB_ID, JOB_ID);
helixConfig = helixConfig.withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY,
ConfigValueFactory.fromAnyRef(IntegrationBasicSuite.WORKER_INSTANCE_0))
.withValue(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY, ConfigValueFactory.fromAnyRef(helixJobId))
@@ -78,7 +78,7 @@ public class HelixAssignedParticipantCheckTest {
//Ensure that the SleepingTask is running
AssertWithBackoff.create().maxSleepMs(100).timeoutMs(2000).backoffFactor(1).
- assertTrue(ClusterIntegrationTest.isTaskRunning(IntegrationJobSuite.TASK_STATE_FILE),"Waiting for the task to enter running state");
+ assertTrue(ClusterIntegrationTest.isTaskRunning(TASK_STATE_FILE),"Waiting for the task to enter running state");
//Run the check. Ensure that the configured Helix instance is indeed the assigned participant
// (i.e. no exceptions thrown).
@@ -104,21 +104,4 @@ public class HelixAssignedParticipantCheckTest {
helixManager.disconnect();
}
}
-
- public static class IntegrationJobSuite extends IntegrationBasicSuite {
- public static final String JOB_ID = "job_testJob_345";
- public static final String TASK_STATE_FILE = "/tmp/" + IntegrationJobSuite.class.getSimpleName() + "/taskState/_RUNNING";
-
-
- @Override
- protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) {
- Config newConfig = ConfigFactory.parseMap(ImmutableMap.of(
- ConfigurationKeys.SOURCE_CLASS_KEY, "org.apache.gobblin.cluster.SleepingCustomTaskSource",
- ConfigurationKeys.JOB_ID_KEY, JOB_ID,
- GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, Boolean.TRUE,
- GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 10L, SleepingTask.TASK_STATE_FILE_KEY, TASK_STATE_FILE))
- .withFallback(rawJobConfig);
- return ImmutableMap.of(JOB_NAME, newConfig);
- }
- }
}
\ No newline at end of file
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
index e6c4587..4f045bb 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
@@ -76,6 +76,8 @@ public class IntegrationBasicSuite {
public static final String WORKER_INSTANCE_0 = "WorkerInstance_0";
public static final String TEST_INSTANCE_NAME_KEY = "worker.instance.name";
+ protected final Config jobConfigOverrides;
+
// manager and workers
protected Config managerConfig;
protected Collection<Config> taskDriverConfigs = Lists.newArrayList();
@@ -93,6 +95,11 @@ public class IntegrationBasicSuite {
protected TestingServer testingZKServer;
public IntegrationBasicSuite() {
+ this(ConfigFactory.empty());
+ }
+
+ public IntegrationBasicSuite(Config jobConfigOverrides) {
+ this.jobConfigOverrides = jobConfigOverrides;
try {
initWorkDir();
initJobOutputDir();
@@ -163,7 +170,7 @@ public class IntegrationBasicSuite {
}
protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) {
- return ImmutableMap.of(JOB_NAME, rawJobConfig);
+ return ImmutableMap.of(JOB_NAME, this.jobConfigOverrides.withFallback(rawJobConfig));
}
private void writeJobConf(String jobName, Config jobConfig) throws IOException {
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
index d6c305e..b16febb 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
@@ -17,32 +17,17 @@
package org.apache.gobblin.cluster.suite;
-import java.util.Map;
-
import org.junit.Assert;
-import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
-import org.apache.gobblin.cluster.SleepingTask;
-import org.apache.gobblin.configuration.ConfigurationKeys;
public class IntegrationJobCancelSuite extends IntegrationBasicSuite {
public static final String JOB_ID = "job_HelloWorldTestJob_1234";
public static final String TASK_STATE_FILE = "/tmp/IntegrationJobCancelSuite/taskState/_RUNNING";
- @Override
- protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) {
- Config newConfig = ConfigFactory.parseMap(ImmutableMap.of(
- ConfigurationKeys.SOURCE_CLASS_KEY, "org.apache.gobblin.cluster.SleepingCustomTaskSource",
- ConfigurationKeys.JOB_ID_KEY, JOB_ID,
- GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, Boolean.TRUE,
- GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 10L, SleepingTask.TASK_STATE_FILE_KEY, TASK_STATE_FILE))
- .withFallback(rawJobConfig);
- return ImmutableMap.of(JOB_NAME, newConfig);
+ public IntegrationJobCancelSuite(Config jobConfigOverrides) {
+ super(jobConfigOverrides);
}
@Override
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
index c84c1e7..412603f 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
@@ -27,7 +27,6 @@ import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import com.google.common.io.Resources;
import com.typesafe.config.Config;
@@ -36,10 +35,10 @@ import com.typesafe.config.ConfigParseOptions;
import com.typesafe.config.ConfigSyntax;
import com.typesafe.config.ConfigValueFactory;
+import org.apache.gobblin.cluster.ClusterIntegrationTestUtils;
import org.apache.gobblin.cluster.FsJobConfigurationManager;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.SleepingTask;
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.FsSpecConsumer;
import org.apache.gobblin.runtime.api.FsSpecProducer;
import org.apache.gobblin.runtime.api.JobSpec;
@@ -53,8 +52,8 @@ public class IntegrationJobRestartViaSpecSuite extends IntegrationJobCancelSuite
private final SpecProducer _specProducer;
- public IntegrationJobRestartViaSpecSuite() throws IOException {
- super();
+ public IntegrationJobRestartViaSpecSuite(Config jobConfigOverrides) throws IOException {
+ super(jobConfigOverrides);
FileSystem fs = FileSystem.getLocal(new Configuration());
this._specProducer = new FsSpecProducer(fs, ConfigFactory.empty().withValue(FsSpecConsumer.SPEC_PATH_KEY, ConfigValueFactory.fromAnyRef(FS_SPEC_CONSUMER_DIR)));
}
@@ -66,12 +65,7 @@ public class IntegrationJobRestartViaSpecSuite extends IntegrationJobCancelSuite
ConfigFactory.parseReader(reader, ConfigParseOptions.defaults().setSyntax(ConfigSyntax.CONF));
rawJobConfig = rawJobConfig.withFallback(getClusterConfig());
- Config newConfig = ConfigFactory.parseMap(ImmutableMap
- .of(ConfigurationKeys.SOURCE_CLASS_KEY, "org.apache.gobblin.cluster.SleepingCustomTaskSource",
- ConfigurationKeys.JOB_ID_KEY, JOB_ID,
- GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, Boolean.TRUE,
- GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 100L,
- ConfigurationKeys.JOB_NAME_KEY, JOB_NAME));
+ Config newConfig = ClusterIntegrationTestUtils.buildSleepingJob(JOB_ID, TASK_STATE_FILE, 100L);
newConfig = newConfig.withValue(SleepingTask.TASK_STATE_FILE_KEY, ConfigValueFactory.fromAnyRef(TASK_STATE_FILE));
newConfig = newConfig.withFallback(rawJobConfig);