You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/08/15 00:17:18 UTC

incubator-gobblin git commit: [GOBBLIN-518] Add option to cancel a running job in service

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 4b5f55d08 -> 81898c07a


[GOBBLIN-518] Add option to cancel a running job in service

Closes #2425 from arjun4084346/cancelHelixJob


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/81898c07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/81898c07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/81898c07

Branch: refs/heads/master
Commit: 81898c07a79440738ccf8892c3079bc0cfa89f0f
Parents: 4b5f55d
Author: Arjun <ab...@linkedin.com>
Authored: Tue Aug 14 17:17:13 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Aug 14 17:17:13 2018 -0700

----------------------------------------------------------------------
 ...blinHelixDistributeJobExecutionLauncher.java | 91 ++++++++++++++-----
 .../cluster/GobblinHelixJobLauncher.java        | 39 ++++----
 .../gobblin/cluster/GobblinHelixJobTask.java    |  7 +-
 .../org/apache/gobblin/cluster/HelixUtils.java  | 94 +++++++++++++++++---
 .../cluster/GobblinHelixJobLauncherTest.java    |  5 +-
 .../runtime/GobblinMultiTaskAttempt.java        |  2 +-
 6 files changed, 184 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/81898c07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index f887f0b..b5f8928 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -103,6 +103,15 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
 
   private boolean jobSubmitted;
 
+  // A conditional variable for which the condition is satisfied if a cancellation is requested
+  private final Object cancellationRequest = new Object();
+  // A flag indicating whether a cancellation has been requested or not
+  private volatile boolean cancellationRequested = false;
+  // A flag indicating whether a cancellation has been executed or not
+  private volatile boolean cancellationExecuted = false;
+  @Getter
+  private DistributeJobMonitor jobMonitor;
+
   public GobblinHelixDistributeJobExecutionLauncher(Builder builder) throws Exception {
     this.helixManager = builder.manager;
     this.helixTaskDriver = new TaskDriver(this.helixManager);
@@ -128,13 +137,23 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
   }
 
   @Override
-  public void close()
-      throws IOException {
-    // we should delete the planning job at the end.
+  public void close()  throws IOException {
+  }
+
+  private void executeCancellation() {
+    String planningName = getPlanningJobId(this.jobProperties);
     if (this.jobSubmitted) {
-      String planningName = getPlanningJobName(this.jobProperties);
-      log.info("[DELETE] workflow {} in the close.", planningName);
-      this.helixTaskDriver.delete(planningName);
+      try {
+        if (this.cancellationRequested && !this.cancellationExecuted) {
+          // TODO : fix this when HELIX-1180 is completed
+          // work flow should never be deleted explicitly because it has a expiry time
+          // If cancellation is requested, we should set the job state to CANCELLED/ABORT
+          this.helixTaskDriver.waitToStop(planningName, 10000L);
+          log.info("Stopped the workflow ", planningName);
+        }
+      } catch (InterruptedException e) {
+        throw new RuntimeException("Failed to stop workflow " + planningName + " in Helix", e);
+      }
     }
   }
 
@@ -210,7 +229,8 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
 
   @Override
   public DistributeJobMonitor launchJob(JobSpec jobSpec) {
-    return new DistributeJobMonitor(new DistributeJobCallable(this.jobProperties));
+    this.jobMonitor = new DistributeJobMonitor(new DistributeJobCallable(this.jobProperties));
+    return this.jobMonitor;
   }
 
   @AllArgsConstructor
@@ -219,20 +239,19 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
     @Override
     public DistributeJobResult call()
         throws Exception {
-      String planningName = getPlanningJobName(this.jobProps);
       String planningId = getPlanningJobId(this.jobProps);
       JobConfig.Builder builder = createPlanningJob(this.jobProps);
       try {
-        submitJobToHelix(planningName, planningId, builder);
-        return waitForJobCompletion(planningName, planningId);
+        submitJobToHelix(planningId, planningId, builder);
+        return waitForJobCompletion(planningId, planningId);
       } catch (Exception e) {
-        log.error(planningName + " is not able to submit.");
+        log.error(planningId + " is not able to submit.");
         return new DistributeJobResult(Optional.empty(), Optional.of(e));
       }
     }
   }
 
-  private DistributeJobResult waitForJobCompletion(String planningName, String planningId) throws InterruptedException {
+  private DistributeJobResult waitForJobCompletion(String workFlowName, String jobName) throws InterruptedException {
     boolean timeoutEnabled = Boolean.parseBoolean(this.jobProperties.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
         ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED));
     long timeoutInSeconds = Long.parseLong(this.jobProperties.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
@@ -241,16 +260,13 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
     try {
       HelixUtils.waitJobCompletion(
           GobblinHelixDistributeJobExecutionLauncher.this.helixManager,
-          planningName,
-          planningId,
+          workFlowName,
+          jobName,
           timeoutEnabled ? Optional.of(timeoutInSeconds) : Optional.empty());
       return getResultFromUserContent();
     } catch (TimeoutException te) {
-      helixTaskDriver.waitToStop(planningName, 10L);
-      log.info("[DELETE] workflow {} timeout.", planningName);
-      this.helixTaskDriver.delete(planningName);
-      this.helixTaskDriver.resume(planningName);
-      log.info("stopped the queue, deleted the job");
+      HelixUtils.handleJobTimeout(workFlowName, jobName,
+          helixManager, this, null);
       return new DistributeJobResult(Optional.empty(), Optional.of(te));
     }
   }
@@ -282,9 +298,9 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
     }
   }
 
-  static class DistributeJobMonitor extends FutureTask<ExecutionResult> implements JobExecutionMonitor {
+  private class DistributeJobMonitor extends FutureTask<ExecutionResult> implements JobExecutionMonitor {
     private ExecutorService executor = Executors.newSingleThreadExecutor();
-    public DistributeJobMonitor (Callable<ExecutionResult> c) {
+    DistributeJobMonitor (Callable<ExecutionResult> c) {
       super(c);
       this.executor.execute(this);
     }
@@ -293,6 +309,39 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
     public MonitoredObject getRunningState() {
       throw new UnsupportedOperationException();
     }
+
+    /**
+     * We override Future's cancel method, which means we do not send the interrupt to the underlying thread.
+     * Instead of that, we submit a STOP request to handle, and the underlying thread is supposed to gracefully accept
+     * the STOPPED state and exit in {@link #waitForJobCompletion} method.
+     * @param mayInterruptIfRunning this is ignored.
+     * @return true always
+     */
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+      GobblinHelixDistributeJobExecutionLauncher.this.executeCancellation();
+      return true;
+    }
+  }
+
+  /**
+   * This method calls the underlying {@link DistributeJobMonitor}'s cancel method.
+   * It uses a conditional variable {@link GobblinHelixDistributeJobExecutionLauncher#cancellationRequest}
+   * and a flag {@link GobblinHelixDistributeJobExecutionLauncher#cancellationRequested} to avoid double cancellation.
+   */
+  public void cancel() {
+    DistributeJobMonitor jobMonitor = getJobMonitor();
+    if (jobMonitor != null) {
+      synchronized (GobblinHelixDistributeJobExecutionLauncher.this.cancellationRequest) {
+        if (GobblinHelixDistributeJobExecutionLauncher.this.cancellationRequested) {
+          // Return immediately if a cancellation has already been requested
+          return;
+        }
+        GobblinHelixDistributeJobExecutionLauncher.this.cancellationRequested = true;
+      }
+      jobMonitor.cancel(true);
+      GobblinHelixDistributeJobExecutionLauncher.this.cancellationExecuted = true;
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/81898c07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index e2447a5..6672923 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -45,6 +45,7 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
 
 import javax.annotation.Nullable;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
@@ -108,6 +109,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
   private final TaskDriver helixTaskDriver;
   private final String helixWorkFlowName;
   private final String jobResourceName;
+  @Getter
   private JobListener jobListener;
 
   private final FileSystem fs;
@@ -170,6 +172,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
 
     this.taskStateCollectorService = new TaskStateCollectorService(jobProps, this.jobContext.getJobState(),
         this.eventBus, this.stateStores.getTaskStateStore(), outputTaskStateDir);
+
     startCancellationExecutor();
   }
 
@@ -193,10 +196,17 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
 
       TimingEvent jobSubmissionTimer =
           this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.HELIX_JOB_SUBMISSION);
-      submitJobToHelix(createJob(workUnits));
-      jobSubmissionTimer.stop();
-      LOGGER.info(String.format("Submitted job %s to Helix", this.jobContext.getJobId()));
-      this.jobSubmitted = true;
+
+      synchronized (this.cancellationRequest) {
+        if (!this.cancellationRequested) {
+          submitJobToHelix(createJob(workUnits));
+          jobSubmissionTimer.stop();
+          LOGGER.info(String.format("Submitted job %s to Helix", this.jobContext.getJobId()));
+          this.jobSubmitted = true;
+        } else {
+          LOGGER.warn("Job {} not submitted to Helix as it was requested to be cancelled.", this.jobContext.getJobId());
+        }
+      }
 
       TimingEvent jobRunTimer = this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.HELIX_JOB_RUN);
       waitForJobCompletion();
@@ -214,15 +224,15 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
   protected void executeCancellation() {
     if (this.jobSubmitted) {
       try {
-        log.info("[DELETE] workflow {}", this.helixWorkFlowName);
-        if (this.cancellationRequested) {
+        if (this.cancellationRequested && !this.cancellationExecuted) {
           // TODO : fix this when HELIX-1180 is completed
           // work flow should never be deleted explicitly because it has a expiry time
           // If cancellation is requested, we should set the job state to CANCELLED/ABORT
-          this.helixTaskDriver.delete(this.helixWorkFlowName);
+          this.helixTaskDriver.waitToStop(this.helixWorkFlowName, 10000L);
+          log.info("stopped the workflow ", this.helixWorkFlowName);
         }
-      } catch (IllegalArgumentException e) {
-        LOGGER.warn("Failed to cancel job {} in Helix", this.jobContext.getJobId(), e);
+      } catch (InterruptedException e) {
+        throw new RuntimeException("Failed to stop workflow " + helixWorkFlowName + " in Helix", e);
       }
     }
   }
@@ -367,7 +377,6 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
   }
 
   private void waitForJobCompletion()  throws InterruptedException {
-    LOGGER.info("Waiting for job to complete...");
     boolean timeoutEnabled = Boolean.parseBoolean(this.jobProps.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
         ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED));
     long timeoutInSeconds = Long.parseLong(this.jobProps.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
@@ -380,14 +389,8 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
           this.jobContext.getJobId(),
           timeoutEnabled? Optional.of(timeoutInSeconds) : Optional.empty());
     } catch (TimeoutException te) {
-      helixTaskDriver.waitToStop(helixWorkFlowName, 10L);
-      try {
-        cancelJob(this.jobListener);
-      } catch (JobException e) {
-        throw new RuntimeException("Unable to cancel job " + jobContext.getJobName() + ": ", e);
-      }
-      this.helixTaskDriver.resume(this.helixWorkFlowName);
-      LOGGER.info("stopped the queue, deleted the job");
+      HelixUtils.handleJobTimeout(helixWorkFlowName, jobContext.getJobId(),
+          helixManager, this, this.jobListener);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/81898c07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
index 9ede090..4fb198c 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
@@ -41,6 +41,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.runtime.util.StateStores;
 import org.apache.gobblin.source.extractor.partition.Partitioner;
@@ -133,7 +134,11 @@ public class GobblinHelixJobTask implements Task {
   public void cancel() {
     log.info("Cancelling planning job {}", this.planningJobId);
     if (launcher != null) {
-      launcher.executeCancellation();
+      try {
+        launcher.cancelJob(launcher.getJobListener());
+      } catch (JobException e) {
+        throw new RuntimeException("Unable to cancel planning job " + this.planningJobId + ": ", e);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/81898c07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 1a11ee3..452e421 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -25,8 +25,8 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.task.JobConfig;
-import org.apache.helix.task.TargetState;
 import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.Workflow;
 import org.apache.helix.task.WorkflowConfig;
@@ -36,6 +36,11 @@ import org.apache.helix.tools.ClusterSetup;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.JobException;
+import org.apache.gobblin.runtime.listeners.JobListener;
+
+import static org.apache.helix.task.TaskState.STOPPED;
+
 
 /**
  * A utility class for working with Gobblin on Helix.
@@ -115,15 +120,26 @@ public class HelixUtils {
     // start the workflow
     helixTaskDriver.start(workFlow);
     log.info("Created a work flow {}", workFlowName);
+    WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName);
+
+    // If the helix job is deleted from some other thread or a completely external process,
+    // method waitJobCompletion() needs to differentiate between the cases where
+    // 1) workflowContext did not get initialized ever, in which case we need to keep waiting, or
+    // 2) it did get initialized but deleted soon after, in which case we should stop waiting
+    // To overcome this issue, we wait here till workflowContext gets initialized
+
+    while (workflowContext == null || workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName)) == null) {
+      workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName);
+      Thread.sleep(1000);
+      log.info("Waiting for work flow initialization.");
+    }
+    log.info("Work flow {} initialized", workFlowName);
   }
 
-  public static void waitJobCompletion(
-      HelixManager helixManager,
-      String workFlowName,
-      String jobName,
+  static void waitJobCompletion(HelixManager helixManager, String workFlowName, String jobName,
       Optional<Long> timeoutInSeconds) throws InterruptedException, TimeoutException {
 
-    log.info("Waiting for job to complete...");
+    log.info("Waiting for job {} to complete...", jobName);
     long endTime = 0;
     if (timeoutInSeconds.isPresent()) {
       endTime = System.currentTimeMillis() + timeoutInSeconds.get() * 1000;
@@ -132,16 +148,70 @@ public class HelixUtils {
     while (!timeoutInSeconds.isPresent() || System.currentTimeMillis() <= endTime) {
       WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName);
       if (workflowContext != null) {
-        org.apache.helix.task.TaskState helixJobState = workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName));
-        if (helixJobState == org.apache.helix.task.TaskState.COMPLETED ||
-            helixJobState == org.apache.helix.task.TaskState.FAILED ||
-            helixJobState == org.apache.helix.task.TaskState.STOPPED) {
+        TaskState jobState = workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName));
+        switch (jobState) {
+          case STOPPED:
+            // user requested cancellation, which is executed by executeCancellation()
+            log.info("Job {} is cancelled, it will be deleted now.", jobName);
+            HelixUtils.deleteStoppedHelixJob(helixManager, workFlowName, jobName);
+            return;
+          case FAILED:
+          case COMPLETED:
           return;
+          default:
+            log.info("Waiting for job {} to complete...", jobName);
+            Thread.sleep(1000);
         }
+      } else {
+        // We have waited for WorkflowContext to get initialized,
+        // so it is found null here, it must have been deleted in job cancellation process.
+        log.info("WorkflowContext not found. Job is probably cancelled.");
+        return;
       }
-      Thread.sleep(1000);
     }
 
     throw new TimeoutException("task driver wait time [" + timeoutInSeconds + " sec] is expired.");
   }
-}
+
+  static void handleJobTimeout(String workFlowName, String jobName, HelixManager helixManager, Object jobLauncher,
+      JobListener jobListener) throws InterruptedException {
+    try {
+      if (jobLauncher instanceof GobblinHelixJobLauncher) {
+        ((GobblinHelixJobLauncher) jobLauncher).cancelJob(jobListener);
+      } else if (jobLauncher instanceof GobblinHelixDistributeJobExecutionLauncher) {
+        ((GobblinHelixDistributeJobExecutionLauncher) jobLauncher).cancel();
+      } else {
+        log.warn("Timeout occured for unknown job launcher {}", jobLauncher.getClass());
+      }
+    } catch (JobException e) {
+      throw new RuntimeException("Unable to cancel job " + jobName + ": ", e);
+    }
+    // TODO : fix this when HELIX-1180 is completed
+    // We should not be deleting a workflow explicitly.
+    // Workflow state should be set to a final state, which will remove it automatically because expiry time is set.
+    // After that, all delete calls can be replaced by something like HelixUtils.setStateToFinal();
+    HelixUtils.deleteStoppedHelixJob(helixManager, workFlowName, jobName);
+    log.info("Stopped and deleted the workflow {}", workFlowName);
+  }
+
+  /**
+   * Deletes the stopped Helix Workflow.
+   * Caller should stop the Workflow before calling this method.
+   * @param helixManager helix manager
+   * @param workFlowName workflow needed to be deleted
+   * @param jobName helix job name
+   * @throws InterruptedException
+   */
+  private static void deleteStoppedHelixJob(HelixManager helixManager, String workFlowName, String jobName)
+      throws InterruptedException {
+    WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName);
+    while (workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName)) != STOPPED) {
+      log.info("Waiting for job {} to stop...", jobName);
+      workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName);
+      Thread.sleep(1000);
+    }
+    // deleting the entire workflow, as one workflow contains only one job
+    new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, 10000L);
+    log.info("Workflow deleted.");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/81898c07/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
index c66a9e8..1e1db0e 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
@@ -186,7 +186,9 @@ public class GobblinHelixJobLauncherTest {
 
     properties.setProperty(ConfigurationKeys.WRITER_FILE_PATH, jobName);
 
-    properties.setProperty(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS, "2");
+    // expiry time should be more than the time needed for the job to complete
+    // otherwise JobContext will become null. This is how Helix work flow works.
+    properties.setProperty(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS, "5");
 
     return properties;
   }
@@ -299,6 +301,7 @@ public class GobblinHelixJobLauncherTest {
 
     final String jobIdKey1 = properties.getProperty(ConfigurationKeys.JOB_ID_KEY);
     final String jobIdKey2 = properties2.getProperty(ConfigurationKeys.JOB_ID_KEY);
+
     org.apache.helix.task.JobContext jobContext1 = taskDriver.getJobContext(jobIdKey1);
     org.apache.helix.task.JobContext jobContext2 = taskDriver.getJobContext(jobIdKey2);
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/81898c07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index e06f338..5784e61 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -115,6 +115,7 @@ public class GobblinMultiTaskAttempt {
     this.log = LoggerFactory.getLogger(GobblinMultiTaskAttempt.class.getName() + "-" +
                containerIdOptional.or("noattempt"));
     this.jobBroker = jobBroker;
+    this.tasks = new ArrayList<>();
   }
 
   /**
@@ -126,7 +127,6 @@ public class GobblinMultiTaskAttempt {
       throws IOException, InterruptedException {
     if (!this.workUnits.hasNext()) {
       log.warn("No work units to run in container " + containerIdOptional.or(""));
-      this.tasks = new ArrayList<>();
       return;
     }