You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/08/15 00:17:18 UTC
incubator-gobblin git commit: [GOBBLIN-518] Add option to cancel a
running job in service
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 4b5f55d08 -> 81898c07a
[GOBBLIN-518] Add option to cancel a running job in service
Closes #2425 from arjun4084346/cancelHelixJob
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/81898c07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/81898c07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/81898c07
Branch: refs/heads/master
Commit: 81898c07a79440738ccf8892c3079bc0cfa89f0f
Parents: 4b5f55d
Author: Arjun <ab...@linkedin.com>
Authored: Tue Aug 14 17:17:13 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Aug 14 17:17:13 2018 -0700
----------------------------------------------------------------------
...blinHelixDistributeJobExecutionLauncher.java | 91 ++++++++++++++-----
.../cluster/GobblinHelixJobLauncher.java | 39 ++++----
.../gobblin/cluster/GobblinHelixJobTask.java | 7 +-
.../org/apache/gobblin/cluster/HelixUtils.java | 94 +++++++++++++++++---
.../cluster/GobblinHelixJobLauncherTest.java | 5 +-
.../runtime/GobblinMultiTaskAttempt.java | 2 +-
6 files changed, 184 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/81898c07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index f887f0b..b5f8928 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -103,6 +103,15 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
private boolean jobSubmitted;
+ // A conditional variable for which the condition is satisfied if a cancellation is requested
+ private final Object cancellationRequest = new Object();
+ // A flag indicating whether a cancellation has been requested or not
+ private volatile boolean cancellationRequested = false;
+ // A flag indicating whether a cancellation has been executed or not
+ private volatile boolean cancellationExecuted = false;
+ @Getter
+ private DistributeJobMonitor jobMonitor;
+
public GobblinHelixDistributeJobExecutionLauncher(Builder builder) throws Exception {
this.helixManager = builder.manager;
this.helixTaskDriver = new TaskDriver(this.helixManager);
@@ -128,13 +137,23 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
}
@Override
- public void close()
- throws IOException {
- // we should delete the planning job at the end.
+ public void close() throws IOException {
+ }
+
+ private void executeCancellation() {
+ String planningName = getPlanningJobId(this.jobProperties);
if (this.jobSubmitted) {
- String planningName = getPlanningJobName(this.jobProperties);
- log.info("[DELETE] workflow {} in the close.", planningName);
- this.helixTaskDriver.delete(planningName);
+ try {
+ if (this.cancellationRequested && !this.cancellationExecuted) {
+ // TODO : fix this when HELIX-1180 is completed
+ // work flow should never be deleted explicitly because it has a expiry time
+ // If cancellation is requested, we should set the job state to CANCELLED/ABORT
+ this.helixTaskDriver.waitToStop(planningName, 10000L);
+ log.info("Stopped the workflow ", planningName);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Failed to stop workflow " + planningName + " in Helix", e);
+ }
}
}
@@ -210,7 +229,8 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
@Override
public DistributeJobMonitor launchJob(JobSpec jobSpec) {
- return new DistributeJobMonitor(new DistributeJobCallable(this.jobProperties));
+ this.jobMonitor = new DistributeJobMonitor(new DistributeJobCallable(this.jobProperties));
+ return this.jobMonitor;
}
@AllArgsConstructor
@@ -219,20 +239,19 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
@Override
public DistributeJobResult call()
throws Exception {
- String planningName = getPlanningJobName(this.jobProps);
String planningId = getPlanningJobId(this.jobProps);
JobConfig.Builder builder = createPlanningJob(this.jobProps);
try {
- submitJobToHelix(planningName, planningId, builder);
- return waitForJobCompletion(planningName, planningId);
+ submitJobToHelix(planningId, planningId, builder);
+ return waitForJobCompletion(planningId, planningId);
} catch (Exception e) {
- log.error(planningName + " is not able to submit.");
+ log.error(planningId + " is not able to submit.");
return new DistributeJobResult(Optional.empty(), Optional.of(e));
}
}
}
- private DistributeJobResult waitForJobCompletion(String planningName, String planningId) throws InterruptedException {
+ private DistributeJobResult waitForJobCompletion(String workFlowName, String jobName) throws InterruptedException {
boolean timeoutEnabled = Boolean.parseBoolean(this.jobProperties.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED));
long timeoutInSeconds = Long.parseLong(this.jobProperties.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
@@ -241,16 +260,13 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
try {
HelixUtils.waitJobCompletion(
GobblinHelixDistributeJobExecutionLauncher.this.helixManager,
- planningName,
- planningId,
+ workFlowName,
+ jobName,
timeoutEnabled ? Optional.of(timeoutInSeconds) : Optional.empty());
return getResultFromUserContent();
} catch (TimeoutException te) {
- helixTaskDriver.waitToStop(planningName, 10L);
- log.info("[DELETE] workflow {} timeout.", planningName);
- this.helixTaskDriver.delete(planningName);
- this.helixTaskDriver.resume(planningName);
- log.info("stopped the queue, deleted the job");
+ HelixUtils.handleJobTimeout(workFlowName, jobName,
+ helixManager, this, null);
return new DistributeJobResult(Optional.empty(), Optional.of(te));
}
}
@@ -282,9 +298,9 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
}
}
- static class DistributeJobMonitor extends FutureTask<ExecutionResult> implements JobExecutionMonitor {
+ private class DistributeJobMonitor extends FutureTask<ExecutionResult> implements JobExecutionMonitor {
private ExecutorService executor = Executors.newSingleThreadExecutor();
- public DistributeJobMonitor (Callable<ExecutionResult> c) {
+ DistributeJobMonitor (Callable<ExecutionResult> c) {
super(c);
this.executor.execute(this);
}
@@ -293,6 +309,39 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
public MonitoredObject getRunningState() {
throw new UnsupportedOperationException();
}
+
+ /**
+ * We override Future's cancel method, which means we do not send the interrupt to the underlying thread.
+ * Instead of that, we submit a STOP request to handle, and the underlying thread is supposed to gracefully accept
+ * the STOPPED state and exit in {@link #waitForJobCompletion} method.
+ * @param mayInterruptIfRunning this is ignored.
+ * @return true always
+ */
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ GobblinHelixDistributeJobExecutionLauncher.this.executeCancellation();
+ return true;
+ }
+ }
+
+ /**
+ * This method calls the underlying {@link DistributeJobMonitor}'s cancel method.
+ * It uses a conditional variable {@link GobblinHelixDistributeJobExecutionLauncher#cancellationRequest}
+ * and a flag {@link GobblinHelixDistributeJobExecutionLauncher#cancellationRequested} to avoid double cancellation.
+ */
+ public void cancel() {
+ DistributeJobMonitor jobMonitor = getJobMonitor();
+ if (jobMonitor != null) {
+ synchronized (GobblinHelixDistributeJobExecutionLauncher.this.cancellationRequest) {
+ if (GobblinHelixDistributeJobExecutionLauncher.this.cancellationRequested) {
+ // Return immediately if a cancellation has already been requested
+ return;
+ }
+ GobblinHelixDistributeJobExecutionLauncher.this.cancellationRequested = true;
+ }
+ jobMonitor.cancel(true);
+ GobblinHelixDistributeJobExecutionLauncher.this.cancellationExecuted = true;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/81898c07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index e2447a5..6672923 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -45,6 +45,7 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
import javax.annotation.Nullable;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
@@ -108,6 +109,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
private final TaskDriver helixTaskDriver;
private final String helixWorkFlowName;
private final String jobResourceName;
+ @Getter
private JobListener jobListener;
private final FileSystem fs;
@@ -170,6 +172,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
this.taskStateCollectorService = new TaskStateCollectorService(jobProps, this.jobContext.getJobState(),
this.eventBus, this.stateStores.getTaskStateStore(), outputTaskStateDir);
+
startCancellationExecutor();
}
@@ -193,10 +196,17 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
TimingEvent jobSubmissionTimer =
this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.HELIX_JOB_SUBMISSION);
- submitJobToHelix(createJob(workUnits));
- jobSubmissionTimer.stop();
- LOGGER.info(String.format("Submitted job %s to Helix", this.jobContext.getJobId()));
- this.jobSubmitted = true;
+
+ synchronized (this.cancellationRequest) {
+ if (!this.cancellationRequested) {
+ submitJobToHelix(createJob(workUnits));
+ jobSubmissionTimer.stop();
+ LOGGER.info(String.format("Submitted job %s to Helix", this.jobContext.getJobId()));
+ this.jobSubmitted = true;
+ } else {
+ LOGGER.warn("Job {} not submitted to Helix as it was requested to be cancelled.", this.jobContext.getJobId());
+ }
+ }
TimingEvent jobRunTimer = this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.HELIX_JOB_RUN);
waitForJobCompletion();
@@ -214,15 +224,15 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
protected void executeCancellation() {
if (this.jobSubmitted) {
try {
- log.info("[DELETE] workflow {}", this.helixWorkFlowName);
- if (this.cancellationRequested) {
+ if (this.cancellationRequested && !this.cancellationExecuted) {
// TODO : fix this when HELIX-1180 is completed
// work flow should never be deleted explicitly because it has a expiry time
// If cancellation is requested, we should set the job state to CANCELLED/ABORT
- this.helixTaskDriver.delete(this.helixWorkFlowName);
+ this.helixTaskDriver.waitToStop(this.helixWorkFlowName, 10000L);
+ log.info("stopped the workflow ", this.helixWorkFlowName);
}
- } catch (IllegalArgumentException e) {
- LOGGER.warn("Failed to cancel job {} in Helix", this.jobContext.getJobId(), e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Failed to stop workflow " + helixWorkFlowName + " in Helix", e);
}
}
}
@@ -367,7 +377,6 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
}
private void waitForJobCompletion() throws InterruptedException {
- LOGGER.info("Waiting for job to complete...");
boolean timeoutEnabled = Boolean.parseBoolean(this.jobProps.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED));
long timeoutInSeconds = Long.parseLong(this.jobProps.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
@@ -380,14 +389,8 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
this.jobContext.getJobId(),
timeoutEnabled? Optional.of(timeoutInSeconds) : Optional.empty());
} catch (TimeoutException te) {
- helixTaskDriver.waitToStop(helixWorkFlowName, 10L);
- try {
- cancelJob(this.jobListener);
- } catch (JobException e) {
- throw new RuntimeException("Unable to cancel job " + jobContext.getJobName() + ": ", e);
- }
- this.helixTaskDriver.resume(this.helixWorkFlowName);
- LOGGER.info("stopped the queue, deleted the job");
+ HelixUtils.handleJobTimeout(helixWorkFlowName, jobContext.getJobId(),
+ helixManager, this, this.jobListener);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/81898c07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
index 9ede090..4fb198c 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
@@ -41,6 +41,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.util.StateStores;
import org.apache.gobblin.source.extractor.partition.Partitioner;
@@ -133,7 +134,11 @@ public class GobblinHelixJobTask implements Task {
public void cancel() {
log.info("Cancelling planning job {}", this.planningJobId);
if (launcher != null) {
- launcher.executeCancellation();
+ try {
+ launcher.cancelJob(launcher.getJobListener());
+ } catch (JobException e) {
+ throw new RuntimeException("Unable to cancel planning job " + this.planningJobId + ": ", e);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/81898c07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 1a11ee3..452e421 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -25,8 +25,8 @@ import org.apache.helix.HelixManager;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.task.JobConfig;
-import org.apache.helix.task.TargetState;
import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.Workflow;
import org.apache.helix.task.WorkflowConfig;
@@ -36,6 +36,11 @@ import org.apache.helix.tools.ClusterSetup;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.JobException;
+import org.apache.gobblin.runtime.listeners.JobListener;
+
+import static org.apache.helix.task.TaskState.STOPPED;
+
/**
* A utility class for working with Gobblin on Helix.
@@ -115,15 +120,26 @@ public class HelixUtils {
// start the workflow
helixTaskDriver.start(workFlow);
log.info("Created a work flow {}", workFlowName);
+ WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName);
+
+ // If the helix job is deleted from some other thread or a completely external process,
+ // method waitJobCompletion() needs to differentiate between the cases where
+ // 1) workflowContext did not get initialized ever, in which case we need to keep waiting, or
+ // 2) it did get initialized but deleted soon after, in which case we should stop waiting
+ // To overcome this issue, we wait here till workflowContext gets initialized
+
+ while (workflowContext == null || workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName)) == null) {
+ workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName);
+ Thread.sleep(1000);
+ log.info("Waiting for work flow initialization.");
+ }
+ log.info("Work flow {} initialized", workFlowName);
}
- public static void waitJobCompletion(
- HelixManager helixManager,
- String workFlowName,
- String jobName,
+ static void waitJobCompletion(HelixManager helixManager, String workFlowName, String jobName,
Optional<Long> timeoutInSeconds) throws InterruptedException, TimeoutException {
- log.info("Waiting for job to complete...");
+ log.info("Waiting for job {} to complete...", jobName);
long endTime = 0;
if (timeoutInSeconds.isPresent()) {
endTime = System.currentTimeMillis() + timeoutInSeconds.get() * 1000;
@@ -132,16 +148,70 @@ public class HelixUtils {
while (!timeoutInSeconds.isPresent() || System.currentTimeMillis() <= endTime) {
WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName);
if (workflowContext != null) {
- org.apache.helix.task.TaskState helixJobState = workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName));
- if (helixJobState == org.apache.helix.task.TaskState.COMPLETED ||
- helixJobState == org.apache.helix.task.TaskState.FAILED ||
- helixJobState == org.apache.helix.task.TaskState.STOPPED) {
+ TaskState jobState = workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName));
+ switch (jobState) {
+ case STOPPED:
+ // user requested cancellation, which is executed by executeCancellation()
+ log.info("Job {} is cancelled, it will be deleted now.", jobName);
+ HelixUtils.deleteStoppedHelixJob(helixManager, workFlowName, jobName);
+ return;
+ case FAILED:
+ case COMPLETED:
return;
+ default:
+ log.info("Waiting for job {} to complete...", jobName);
+ Thread.sleep(1000);
}
+ } else {
+ // We have waited for WorkflowContext to get initialized,
+ // so it is found null here, it must have been deleted in job cancellation process.
+ log.info("WorkflowContext not found. Job is probably cancelled.");
+ return;
}
- Thread.sleep(1000);
}
throw new TimeoutException("task driver wait time [" + timeoutInSeconds + " sec] is expired.");
}
-}
+
+ static void handleJobTimeout(String workFlowName, String jobName, HelixManager helixManager, Object jobLauncher,
+ JobListener jobListener) throws InterruptedException {
+ try {
+ if (jobLauncher instanceof GobblinHelixJobLauncher) {
+ ((GobblinHelixJobLauncher) jobLauncher).cancelJob(jobListener);
+ } else if (jobLauncher instanceof GobblinHelixDistributeJobExecutionLauncher) {
+ ((GobblinHelixDistributeJobExecutionLauncher) jobLauncher).cancel();
+ } else {
+ log.warn("Timeout occured for unknown job launcher {}", jobLauncher.getClass());
+ }
+ } catch (JobException e) {
+ throw new RuntimeException("Unable to cancel job " + jobName + ": ", e);
+ }
+ // TODO : fix this when HELIX-1180 is completed
+ // We should not be deleting a workflow explicitly.
+ // Workflow state should be set to a final state, which will remove it automatically because expiry time is set.
+ // After that, all delete calls can be replaced by something like HelixUtils.setStateToFinal();
+ HelixUtils.deleteStoppedHelixJob(helixManager, workFlowName, jobName);
+ log.info("Stopped and deleted the workflow {}", workFlowName);
+ }
+
+ /**
+ * Deletes the stopped Helix Workflow.
+ * Caller should stop the Workflow before calling this method.
+ * @param helixManager helix manager
+ * @param workFlowName workflow needed to be deleted
+ * @param jobName helix job name
+ * @throws InterruptedException
+ */
+ private static void deleteStoppedHelixJob(HelixManager helixManager, String workFlowName, String jobName)
+ throws InterruptedException {
+ WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName);
+ while (workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName)) != STOPPED) {
+ log.info("Waiting for job {} to stop...", jobName);
+ workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName);
+ Thread.sleep(1000);
+ }
+ // deleting the entire workflow, as one workflow contains only one job
+ new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, 10000L);
+ log.info("Workflow deleted.");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/81898c07/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
index c66a9e8..1e1db0e 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
@@ -186,7 +186,9 @@ public class GobblinHelixJobLauncherTest {
properties.setProperty(ConfigurationKeys.WRITER_FILE_PATH, jobName);
- properties.setProperty(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS, "2");
+ // expiry time should be more than the time needed for the job to complete
+ // otherwise JobContext will become null. This is how Helix work flow works.
+ properties.setProperty(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS, "5");
return properties;
}
@@ -299,6 +301,7 @@ public class GobblinHelixJobLauncherTest {
final String jobIdKey1 = properties.getProperty(ConfigurationKeys.JOB_ID_KEY);
final String jobIdKey2 = properties2.getProperty(ConfigurationKeys.JOB_ID_KEY);
+
org.apache.helix.task.JobContext jobContext1 = taskDriver.getJobContext(jobIdKey1);
org.apache.helix.task.JobContext jobContext2 = taskDriver.getJobContext(jobIdKey2);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/81898c07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index e06f338..5784e61 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -115,6 +115,7 @@ public class GobblinMultiTaskAttempt {
this.log = LoggerFactory.getLogger(GobblinMultiTaskAttempt.class.getName() + "-" +
containerIdOptional.or("noattempt"));
this.jobBroker = jobBroker;
+ this.tasks = new ArrayList<>();
}
/**
@@ -126,7 +127,6 @@ public class GobblinMultiTaskAttempt {
throws IOException, InterruptedException {
if (!this.workUnits.hasNext()) {
log.warn("No work units to run in container " + containerIdOptional.or(""));
- this.tasks = new ArrayList<>();
return;
}