You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/04/23 21:52:51 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-744] Support
cancellation of a Helix workflow via a DELETE Spec.
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b450a07 [GOBBLIN-744] Support cancellation of a Helix workflow via a DELETE Spec.
b450a07 is described below
commit b450a077bdb03f42afa9cb8d674159f5b4d507d4
Author: sv2000 <su...@gmail.com>
AuthorDate: Tue Apr 23 14:52:43 2019 -0700
[GOBBLIN-744] Support cancellation of a Helix workflow via a DELETE Spec.
Dear Gobblin maintainers,
Please accept this PR. I understand that it will
not be reviewed until I have checked off all the
steps below!
### JIRA
- [x] My PR addresses the following [Gobblin JIRA]
(https://issues.apache.org/jira/browse/GOBBLIN/)
issues and references them in the PR title. For
example, "[GOBBLIN-XXX] My Gobblin PR"
-
https://issues.apache.org/jira/browse/GOBBLIN-744
### Description
- [x] Here are some details about my PR, including
screenshots (if applicable):
This task supports the ability to interrupt and
cancel a running job on a Gobblin Helix cluster
via a DELETE Spec submitted to the
JobConfigurationManager. The DELETE Spec should
have
"gobblin.cluster.shouldCancelRunningJobOnDelete"
set to true for cancelling a running job. The
default behavior is to simply delete the
corresponding JobSpec from the JobCatalog.
### Tests
- [x] My PR adds the following unit tests __OR__
does not need testing for this extremely good
reason:
IntegrationJobCancelViaSpecSuite and
ClusterIntegrationTest.
### Commits
- [x] My commits all reference JIRA issues in
their subject lines, and I have squashed multiple
commits if they address the same issue. In
addition, my commits follow the guidelines from
"[How to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
1. Subject is separated from body by a blank line
2. Subject is limited to 50 characters
3. Subject does not end with a period
4. Subject uses the imperative mood ("add", not
"adding")
5. Body wraps at 72 characters
6. Body explains "what" and "why", not "how"
GOBBLIN-744: Support cancellation of a Helix
workflow via a DELETE Spec.
GOBBLIN-744: Address reviewer comments.
GOBBLIN-744: Address reviewer comments.
GOBBLIN-744: Addressed reviewer comments.
Closes #2609 from sv2000/helixCancel
---
.../FsScheduledJobConfigurationManager.java | 24 ++--
.../cluster/GobblinClusterConfigurationKeys.java | 2 +
.../gobblin/cluster/GobblinHelixJobScheduler.java | 37 +++++-
.../org/apache/gobblin/cluster/SleepingTask.java | 35 ++++-
.../gobblin/cluster/ClusterIntegrationTest.java | 129 +++++++++++++++++--
.../cluster/suite/IntegrationJobCancelSuite.java | 4 +-
.../suite/IntegrationJobRestartViaSpecSuite.java | 142 +++++++++++++++++++++
.../apache/gobblin/runtime/api/FsSpecConsumer.java | 11 +-
8 files changed, 357 insertions(+), 27 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManager.java
index 21db2ff..e4776ef 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManager.java
@@ -18,7 +18,6 @@ package org.apache.gobblin.cluster;
import java.io.IOException;
import java.util.List;
-import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.tuple.Pair;
@@ -31,7 +30,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.runtime.api.FsSpecConsumer;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
-import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
@@ -53,27 +51,31 @@ public class FsScheduledJobConfigurationManager extends ScheduledJobConfiguratio
@Override
protected void fetchJobSpecs() throws ExecutionException, InterruptedException {
- List<Pair<SpecExecutor.Verb, Spec>> jobSpecs =
- (List<Pair<SpecExecutor.Verb, Spec>>) this._specConsumer.changedSpecs().get();
+ List<Pair<SpecExecutor.Verb, JobSpec>> jobSpecs =
+ (List<Pair<SpecExecutor.Verb, JobSpec>>) this._specConsumer.changedSpecs().get();
- for (Pair<SpecExecutor.Verb, Spec> entry : jobSpecs) {
- Spec spec = entry.getValue();
+ for (Pair<SpecExecutor.Verb, JobSpec> entry : jobSpecs) {
+ JobSpec jobSpec = entry.getValue();
SpecExecutor.Verb verb = entry.getKey();
- if (verb.equals(SpecExecutor.Verb.ADD) || verb.equals(SpecExecutor.Verb.UPDATE)) {
+ if (verb.equals(SpecExecutor.Verb.ADD)) {
// Handle addition
- JobSpec jobSpec = (JobSpec) spec;
this._jobCatalog.put(jobSpec);
postNewJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties());
+ } else if (verb.equals(SpecExecutor.Verb.UPDATE)) {
+ //Handle update.
+ //Overwrite the jobSpec in the jobCatalog and post an UpdateJobConfigArrivalEvent.
+ this._jobCatalog.put(jobSpec);
+ postUpdateJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties());
} else if (verb.equals(SpecExecutor.Verb.DELETE)) {
// Handle delete
- this._jobCatalog.remove(spec.getUri());
- postDeleteJobConfigArrival(spec.getUri().toString(), new Properties());
+ this._jobCatalog.remove(jobSpec.getUri());
+ postDeleteJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties());
}
try {
//Acknowledge the successful consumption of the JobSpec back to the SpecConsumer, so that the
//SpecConsumer can delete the JobSpec.
- this._specConsumer.commit(spec);
+ this._specConsumer.commit(jobSpec);
} catch (IOException e) {
log.error("Error when committing to FsSpecConsumer: ", e);
}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 8b33e4e..d1267c5 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -159,4 +159,6 @@ public class GobblinClusterConfigurationKeys {
public static final String KILL_DUPLICATE_PLANNING_JOB = GOBBLIN_CLUSTER_PREFIX + "kill.duplicate.planningJob";
public static final boolean DEFAULT_KILL_DUPLICATE_PLANNING_JOB = true;
+ public static final String CANCEL_RUNNING_JOB_ON_DELETE = GOBBLIN_CLUSTER_PREFIX + "job.cancelRunningJobOnDelete";
+ public static final String DEFAULT_CANCEL_RUNNING_JOB_ON_DELETE = "false";
}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 79b088b..659ff63 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -30,6 +30,7 @@ import java.util.concurrent.locks.Lock;
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixManager;
+import org.apache.helix.task.TaskDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +39,7 @@ import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.Striped;
+import com.typesafe.config.Config;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.cluster.event.DeleteJobConfigArrivalEvent;
@@ -96,6 +98,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
final Striped<Lock> locks = Striped.lazyWeakLock(256);
private boolean startServicesCompleted;
+ private final long helixJobStopTimeoutMillis;
public GobblinHelixJobScheduler(Properties properties,
HelixManager jobHelixManager,
@@ -116,7 +119,8 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
this.jobCatalog = jobCatalog;
this.metricContext = Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(properties), this.getClass());
- int metricsWindowSizeInMin = ConfigUtils.getInt(ConfigUtils.propertiesToConfig(this.properties),
+ Config jobConfig = ConfigUtils.propertiesToConfig(this.properties);
+ int metricsWindowSizeInMin = ConfigUtils.getInt(jobConfig,
ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
@@ -141,6 +145,9 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
metricsWindowSizeInMin);
this.startServicesCompleted = false;
+
+ this.helixJobStopTimeoutMillis = ConfigUtils.getLong(jobConfig, GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS,
+ GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS) * 1000;
}
@Override
@@ -315,6 +322,10 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
} catch (Exception je) {
LOGGER.error("Failed to update job " + updateJobArrival.getJobName(), je);
}
+
+ //Wait until the cancelled job is complete.
+ waitForJobCompletion(updateJobArrival.getJobName());
+
try {
handleNewJobConfigArrival(new NewJobConfigArrivalEvent(updateJobArrival.getJobName(),
updateJobArrival.getJobConfig()));
@@ -323,16 +334,38 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
}
}
+ private void waitForJobCompletion(String jobName) {
+ while (this.jobRunningMap.getOrDefault(jobName, false) != false) {
+ LOGGER.info("Waiting for job {} to stop...", jobName);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Interrupted exception encountered: ", e);
+ }
+ }
+ }
+
@Subscribe
- public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) {
+ public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {
LOGGER.info("Received delete for job configuration of job " + deleteJobArrival.getJobName());
try {
unscheduleJob(deleteJobArrival.getJobName());
+ cancelJobIfRequired(deleteJobArrival);
} catch (JobException je) {
LOGGER.error("Failed to unschedule job " + deleteJobArrival.getJobName());
}
}
+ private void cancelJobIfRequired(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {
+ Properties jobConfig = deleteJobArrival.getJobConfig();
+ if (PropertiesUtils.getPropAsBoolean(jobConfig, GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
+ GobblinClusterConfigurationKeys.DEFAULT_CANCEL_RUNNING_JOB_ON_DELETE)) {
+ LOGGER.info("Cancelling workflow: {}", deleteJobArrival.getJobName());
+ TaskDriver taskDriver = new TaskDriver(this.jobHelixManager);
+ taskDriver.waitToStop(deleteJobArrival.getJobName(), this.helixJobStopTimeoutMillis);
+ LOGGER.info("Stopped workflow: {}", deleteJobArrival.getJobName());
+ }
+ }
/**
* This class is responsible for running non-scheduled jobs.
*/
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java
index 55750c0..8d06b70 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java
@@ -17,24 +17,52 @@
package org.apache.gobblin.cluster;
-import avro.shaded.com.google.common.base.Throwables;
+import java.io.File;
+import java.io.IOException;
+
+import com.google.common.base.Throwables;
+import com.google.common.io.Files;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.runtime.TaskContext;
+import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.task.BaseAbstractTask;
@Slf4j
public class SleepingTask extends BaseAbstractTask {
+ public static final String TASK_STATE_FILE_KEY = "task.state.file.path";
+
private final long sleepTime;
+ private File taskStateFile;
public SleepingTask(TaskContext taskContext) {
super(taskContext);
- sleepTime = taskContext.getTaskState().getPropAsLong("data.publisher.sleep.time.in.seconds", 10L);
+ TaskState taskState = taskContext.getTaskState();
+ sleepTime = taskState.getPropAsLong("data.publisher.sleep.time.in.seconds", 10L);
+ taskStateFile = new File(taskState.getProp(TASK_STATE_FILE_KEY));
+ try {
+ if (taskStateFile.exists()) {
+ if (!taskStateFile.delete()) {
+ log.error("Unable to delete {}", taskStateFile);
+ throw new IOException("File Delete Exception");
+ }
+ } else {
+ Files.createParentDirs(taskStateFile);
+ }
+ } catch (IOException e) {
+ log.error("Unable to create directory: ", taskStateFile.getParent());
+ Throwables.propagate(e);
+ }
+ taskStateFile.deleteOnExit();
}
@Override
public void run() {
try {
+ if (!taskStateFile.createNewFile()) {
+ throw new IOException("File creation error: " + taskStateFile.getName());
+ }
long endTime = System.currentTimeMillis() + sleepTime * 1000;
while (System.currentTimeMillis() <= endTime) {
Thread.sleep(1000L);
@@ -46,6 +74,9 @@ public class SleepingTask extends BaseAbstractTask {
log.error("Sleep interrupted.");
Thread.currentThread().interrupt();
Throwables.propagate(e);
+ } catch (IOException e) {
+ log.error("IOException encountered when creating {}", taskStateFile.getName(), e);
+ Throwables.propagate(e);
}
}
}
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
index 9d5d602..7e810c3 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
@@ -17,15 +17,25 @@
package org.apache.gobblin.cluster;
+import java.io.File;
import java.io.IOException;
+import java.nio.file.Paths;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ChainedPathZkSerializer;
+import org.apache.helix.manager.zk.PathBasedZkSerializer;
+import org.apache.helix.manager.zk.ZNRecordStreamingSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.task.TargetState;
import org.apache.helix.task.TaskDriver;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
+import com.google.common.base.Predicate;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
@@ -35,14 +45,19 @@ import org.apache.gobblin.cluster.suite.IntegrationDedicatedManagerClusterSuite;
import org.apache.gobblin.cluster.suite.IntegrationDedicatedTaskDriverClusterSuite;
import org.apache.gobblin.cluster.suite.IntegrationJobCancelSuite;
import org.apache.gobblin.cluster.suite.IntegrationJobFactorySuite;
+import org.apache.gobblin.cluster.suite.IntegrationJobRestartViaSpecSuite;
import org.apache.gobblin.cluster.suite.IntegrationJobTagSuite;
import org.apache.gobblin.cluster.suite.IntegrationSeparateProcessSuite;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.testing.AssertWithBackoff;
import org.apache.gobblin.util.ConfigUtils;
+
@Slf4j
public class ClusterIntegrationTest {
private IntegrationBasicSuite suite;
+ private String zkConnectString;
@Test
public void testJobShouldComplete()
@@ -51,28 +66,31 @@ public class ClusterIntegrationTest {
runAndVerify();
}
- @Test void testJobShouldGetCancelled() throws Exception {
- this.suite =new IntegrationJobCancelSuite();
+ private HelixManager getHelixManager() {
Config helixConfig = this.suite.getManagerConfig();
String clusterName = helixConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
String instanceName = ConfigUtils.getString(helixConfig, GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY,
GobblinClusterManager.class.getSimpleName());
- String zkConnectString = helixConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+ this.zkConnectString = helixConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
HelixManager helixManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.CONTROLLER, zkConnectString);
+ return helixManager;
+ }
+ @Test void testJobShouldGetCancelled() throws Exception {
+ this.suite =new IntegrationJobCancelSuite();
+ HelixManager helixManager = getHelixManager();
suite.startCluster();
-
helixManager.connect();
TaskDriver taskDriver = new TaskDriver(helixManager);
- while (TaskDriver.getWorkflowContext(helixManager, IntegrationJobCancelSuite.JOB_ID) == null) {
- log.warn("Waiting for the job to start...");
- Thread.sleep(1000L);
- }
+ //Ensure that Helix has created a workflow
+ AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
+ assertTrue(isTaskStarted(helixManager, IntegrationJobCancelSuite.JOB_ID), "Waiting for the job to start...");
- // Give the job some time to reach writer, where it sleeps
- Thread.sleep(2000L);
+ //Ensure that the SleepingTask is running
+ AssertWithBackoff.create().maxSleepMs(100).timeoutMs(2000).backoffFactor(1).
+ assertTrue(isTaskRunning(IntegrationJobCancelSuite.TASK_STATE_FILE),"Waiting for the task to enter running state");
log.info("Stopping the job");
taskDriver.stop(IntegrationJobCancelSuite.JOB_ID);
@@ -82,6 +100,97 @@ public class ClusterIntegrationTest {
suite.waitForAndVerifyOutputFiles();
}
+ /**
+ * An integration test for restarting a Helix workflow via a JobSpec. This test case starts a Helix cluster with
+ * a {@link FsScheduledJobConfigurationManager}. The test case does the following:
+ * <ul>
+ * <li> add a {@link org.apache.gobblin.runtime.api.JobSpec} that uses a {@link org.apache.gobblin.cluster.SleepingCustomTaskSource})
+ * to {@link IntegrationJobRestartViaSpecSuite#FS_SPEC_CONSUMER_DIR}. which is picked by the JobConfigurationManager. </li>
+ * <li> the JobConfigurationManager sends a notification to the GobblinHelixJobScheduler which schedules the job for execution. The JobSpec is
+ * also added to the JobCatalog for persistence. Helix starts a Workflow for this JobSpec. </li>
+ * <li> We then add a {@link org.apache.gobblin.runtime.api.JobSpec} with UPDATE Verb to {@link IntegrationJobRestartViaSpecSuite#FS_SPEC_CONSUMER_DIR}.
+ * This signals GobblinHelixJobScheduler (and, Helix) to first cancel the running job (i.e., Helix Workflow) started in the previous step.
+ * <li> We inspect the state of the zNode corresponding to the Workflow resource in Zookeeper to ensure that its {@link org.apache.helix.task.TargetState}
+ * is STOP. </li>
+ * <li> Once the cancelled job from the previous steps is completed, the job will be re-launched for execution by the GobblinHelixJobScheduler.
+ * We confirm the execution by again inspecting the zNode and ensuring its TargetState is START. </li>
+ * </ul>
+ */
+ @Test (dependsOnMethods = { "testJobShouldGetCancelled" })
+ public void testJobRestartViaSpec() throws Exception {
+ this.suite = new IntegrationJobRestartViaSpecSuite();
+ HelixManager helixManager = getHelixManager();
+
+ IntegrationJobRestartViaSpecSuite restartViaSpecSuite = (IntegrationJobRestartViaSpecSuite) this.suite;
+
+ //Add a new JobSpec to the path monitored by the SpecConsumer
+ restartViaSpecSuite.addJobSpec(IntegrationJobRestartViaSpecSuite.JOB_ID, SpecExecutor.Verb.ADD.name());
+
+ //Start the cluster
+ restartViaSpecSuite.startCluster();
+
+ helixManager.connect();
+
+ AssertWithBackoff.create().timeoutMs(30000).maxSleepMs(1000).backoffFactor(1).
+ assertTrue(isTaskStarted(helixManager, IntegrationJobRestartViaSpecSuite.JOB_ID), "Waiting for the job to start...");
+
+ AssertWithBackoff.create().maxSleepMs(100).timeoutMs(2000).backoffFactor(1).
+ assertTrue(isTaskRunning(IntegrationJobRestartViaSpecSuite.TASK_STATE_FILE), "Waiting for the task to enter running state");
+
+ ZkClient zkClient = new ZkClient(this.zkConnectString);
+ PathBasedZkSerializer zkSerializer = ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer()).build();
+ zkClient.setZkSerializer(zkSerializer);
+
+ String clusterName = getHelixManager().getClusterName();
+ String zNodePath = Paths.get("/", clusterName, "CONFIGS", "RESOURCE", IntegrationJobRestartViaSpecSuite.JOB_ID).toString();
+
+ //Ensure that the Workflow is started
+ ZNRecord record = zkClient.readData(zNodePath);
+ String targetState = record.getSimpleField("TargetState");
+ Assert.assertEquals(targetState, TargetState.START.name());
+
+ //Add a JobSpec with UPDATE verb signalling the Helix cluster to restart the workflow
+ restartViaSpecSuite.addJobSpec(IntegrationJobRestartViaSpecSuite.JOB_ID, SpecExecutor.Verb.UPDATE.name());
+
+ AssertWithBackoff.create().maxSleepMs(1000).timeoutMs(5000).backoffFactor(1).assertTrue(input -> {
+ //Inspect the zNode at the path corresponding to the Workflow resource. Ensure the target state of the resource is in
+ // the STOP state or that the zNode has been deleted.
+ ZNRecord recordNew = zkClient.readData(zNodePath, true);
+ String targetStateNew = null;
+ if (recordNew != null) {
+ targetStateNew = recordNew.getSimpleField("TargetState");
+ }
+ return recordNew == null || targetStateNew.equals(TargetState.STOP.name());
+ }, "Waiting for Workflow TargetState to be STOP");
+
+ //Ensure that the SleepingTask did not terminate normally i.e. it was interrupted. We check this by ensuring
+ // that the line "Hello World!" is not present in the logged output.
+ suite.waitForAndVerifyOutputFiles();
+
+ AssertWithBackoff.create().maxSleepMs(1000).timeoutMs(120000).backoffFactor(1).assertTrue(input -> {
+ //Inspect the zNode at the path corresponding to the Workflow resource. Ensure the target state of the resource is in
+ // the START state.
+ ZNRecord recordNew = zkClient.readData(zNodePath, true);
+ String targetStateNew = null;
+ if (recordNew != null) {
+ targetStateNew = recordNew.getSimpleField("TargetState");
+ return targetStateNew.equals(TargetState.START.name());
+ }
+ return false;
+ }, "Waiting for Workflow TargetState to be START");
+ }
+
+ private Predicate<Void> isTaskStarted(HelixManager helixManager, String jobId) {
+ return input -> TaskDriver.getWorkflowContext(helixManager, jobId) != null;
+ }
+
+ private Predicate<Void> isTaskRunning(String taskStateFileName) {
+ return input -> {
+ File taskStateFile = new File(taskStateFileName);
+ return taskStateFile.exists();
+ };
+ }
+
@Test
public void testSeparateProcessMode()
throws Exception {
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
index 7961bf8..62b3634 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
@@ -26,11 +26,13 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.SleepingTask;
import org.apache.gobblin.configuration.ConfigurationKeys;
public class IntegrationJobCancelSuite extends IntegrationBasicSuite {
public static final String JOB_ID = "job_HelloWorldTestJob_1234";
+ public static final String TASK_STATE_FILE = "/tmp/IntegrationJobCancelSuite/taskState/_RUNNING";
@Override
protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) {
@@ -38,7 +40,7 @@ public class IntegrationJobCancelSuite extends IntegrationBasicSuite {
ConfigurationKeys.SOURCE_CLASS_KEY, "org.apache.gobblin.cluster.SleepingCustomTaskSource",
ConfigurationKeys.JOB_ID_KEY, JOB_ID,
GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, Boolean.TRUE,
- GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 10L))
+ GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 10L, SleepingTask.TASK_STATE_FILE_KEY, TASK_STATE_FILE))
.withFallback(rawJobConfig);
return ImmutableMap.of("HelloWorldJob", newConfig);
}
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
new file mode 100644
index 0000000..eb47536
--- /dev/null
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster.suite;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
+import com.google.common.io.Resources;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import com.typesafe.config.ConfigSyntax;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.cluster.FsScheduledJobConfigurationManager;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.SleepingTask;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FsSpecConsumer;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class IntegrationJobRestartViaSpecSuite extends IntegrationJobCancelSuite {
+ public static final String JOB_ID = "job_HelloWorldTestJob_1235";
+ public static final String JOB_CATALOG_DIR = "/tmp/IntegrationJobCancelViaSpecSuite/jobCatalog";
+ public static final String FS_SPEC_CONSUMER_DIR = "/tmp/IntegrationJobCancelViaSpecSuite/jobSpecs";
+ public static final String TASK_STATE_FILE = "/tmp/IntegrationJobCancelViaSpecSuite/taskState/_RUNNING";
+
+ public IntegrationJobRestartViaSpecSuite() throws IOException {
+ super();
+ Path jobCatalogDirPath = new Path(JOB_CATALOG_DIR);
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ if (!fs.exists(jobCatalogDirPath)) {
+ fs.mkdirs(jobCatalogDirPath);
+ }
+ }
+
+ private Map<String,String> getJobConfig() throws IOException {
+ try (InputStream resourceStream = Resources.getResource(JOB_CONF_NAME).openStream()) {
+ Reader reader = new InputStreamReader(resourceStream);
+ Config rawJobConfig =
+ ConfigFactory.parseReader(reader, ConfigParseOptions.defaults().setSyntax(ConfigSyntax.CONF));
+ rawJobConfig = rawJobConfig.withFallback(getClusterConfig());
+
+ Config newConfig = ConfigFactory.parseMap(ImmutableMap
+ .of(ConfigurationKeys.SOURCE_CLASS_KEY, "org.apache.gobblin.cluster.SleepingCustomTaskSource",
+ ConfigurationKeys.JOB_ID_KEY, JOB_ID,
+ GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, Boolean.TRUE,
+ GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 100L,
+ ConfigurationKeys.JOB_NAME_KEY, JOB_ID));
+
+ newConfig = newConfig.withValue(SleepingTask.TASK_STATE_FILE_KEY, ConfigValueFactory.fromAnyRef(TASK_STATE_FILE));
+ newConfig = newConfig.withFallback(rawJobConfig);
+
+ Properties jobProperties = ConfigUtils.configToProperties(newConfig);
+ Map<String, String> jobPropertiesAsMap = new HashMap<>();
+ for (String name : jobProperties.stringPropertyNames()) {
+ jobPropertiesAsMap.put(name, jobProperties.getProperty(name));
+ }
+ return jobPropertiesAsMap;
+ }
+ }
+
+ @Override
+ public Config getManagerConfig() {
+ Config managerConfig = super.getManagerConfig();
+ managerConfig = managerConfig.withValue(GobblinClusterConfigurationKeys.JOB_CONFIGURATION_MANAGER_KEY,
+ ConfigValueFactory.fromAnyRef(FsScheduledJobConfigurationManager.class.getName()))
+ .withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_PREFIX + ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+ ConfigValueFactory.fromAnyRef(JOB_CATALOG_DIR))
+ .withValue(GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY, ConfigValueFactory.fromAnyRef(FsSpecConsumer.class.getName()))
+ .withValue(GobblinClusterConfigurationKeys.JOB_SPEC_REFRESH_INTERVAL, ConfigValueFactory.fromAnyRef(5L))
+ .withValue(FsSpecConsumer.SPEC_PATH_KEY, ConfigValueFactory.fromAnyRef(FS_SPEC_CONSUMER_DIR));
+ return managerConfig;
+ }
+
+ public void addJobSpec(String jobSpecName, String verb) throws IOException {
+ Map<String, String> metadataMap = new HashMap<>();
+ metadataMap.put(FsSpecConsumer.VERB_KEY, verb);
+
+ Map<String, String> jobProperties = new HashMap<>();
+ if (SpecExecutor.Verb.ADD.name().equals(verb)) {
+ jobProperties = getJobConfig();
+ } else if (SpecExecutor.Verb.DELETE.name().equals(verb)) {
+ jobProperties.put(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
+ } else if (SpecExecutor.Verb.UPDATE.name().equals(verb)) {
+ jobProperties = getJobConfig();
+ jobProperties.put(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
+ }
+
+ AvroJobSpec jobSpec = AvroJobSpec.newBuilder().
+ setUri(Files.getNameWithoutExtension(jobSpecName)).
+ setProperties(jobProperties).
+ setTemplateUri("FS:///").
+ setDescription("HelloWorldTestJob").
+ setVersion("1").
+ setMetadata(metadataMap).build();
+
+ DatumWriter<AvroJobSpec> datumWriter = new SpecificDatumWriter<>(AvroJobSpec.SCHEMA$);
+ DataFileWriter<AvroJobSpec> dataFileWriter = new DataFileWriter<>(datumWriter);
+
+ Path fsSpecConsumerPath = new Path(FS_SPEC_CONSUMER_DIR, jobSpecName);
+ FileSystem fs = fsSpecConsumerPath.getFileSystem(new Configuration());
+ OutputStream out = fs.create(fsSpecConsumerPath);
+
+ dataFileWriter.create(AvroJobSpec.SCHEMA$, out);
+ dataFileWriter.append(jobSpec);
+ dataFileWriter.close();
+ }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
index 58a2f99..a18be23 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
@@ -20,6 +20,8 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -62,7 +64,9 @@ public class FsSpecConsumer implements SpecConsumer<Spec> {
}
}
- /** List of newly changed {@link Spec}s for execution on {@link SpecExecutor}. */
+ /** List of newly changed {@link Spec}s for execution on {@link SpecExecutor}.
+ * The {@link Spec}s are returned in the increasing order of their modification times.
+ */
@Override
public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() {
List<Pair<SpecExecutor.Verb, Spec>> specList = new ArrayList<>();
@@ -74,6 +78,11 @@ public class FsSpecConsumer implements SpecConsumer<Spec> {
return null;
}
+ //Sort the {@link JobSpec}s in increasing order of their modification times.
+ //This is done so that the {JobSpec}s can be handled in FIFO order by the
+ //JobConfigurationManager and eventually, the GobblinHelixJobScheduler.
+ Arrays.sort(fileStatuses, Comparator.comparingLong(FileStatus::getModificationTime));
+
for (FileStatus fileStatus : fileStatuses) {
DataFileReader<AvroJobSpec> dataFileReader;
try {