You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/20 00:46:30 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-963] Remove duplicated copies of TaskContext/TaskState when constructing TaskIFaceWrapper

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ec0cde  [GOBBLIN-963] Remove duplicated copies of TaskContext/TaskState when constructing TaskIFaceWrapper
2ec0cde is described below

commit 2ec0cde77870a24e2efba5acc9529fbd30ec0710
Author: Chen Guo <al...@gmail.com>
AuthorDate: Tue Nov 19 16:46:23 2019 -0800

    [GOBBLIN-963] Remove duplicated copies of TaskContext/TaskState when constructing TaskIFaceWrapper
    
    Closes #2818 from enjoyear/GOBBLIN-963
---
 .../gobblin/runtime/GobblinMultiTaskAttempt.java   | 45 +++++++++++++---------
 1 file changed, 26 insertions(+), 19 deletions(-)

diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index fc98beb..ebd3be9 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -38,6 +38,9 @@ import com.google.common.base.Optional;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 
+import javax.annotation.Nullable;
+import lombok.Setter;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.gobblin_scopes.TaskScopeInstance;
@@ -60,9 +63,6 @@ import org.apache.gobblin.util.Either;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.executors.IteratorExecutor;
 
-import javax.annotation.Nullable;
-import lombok.Setter;
-
 
 /**
  * Attempt of running multiple {@link Task}s generated from a list of{@link WorkUnit}s.
@@ -79,6 +79,7 @@ public class GobblinMultiTaskAttempt {
      * Commit {@link GobblinMultiTaskAttempt} immediately after running is done.
      */
     IMMEDIATE,
+
     /**
      * Not committing {@link GobblinMultiTaskAttempt} but leaving it to user customized launcher.
      */
@@ -129,7 +130,8 @@ public class GobblinMultiTaskAttempt {
    * @throws IOException
    * @throws InterruptedException
    */
-  public void run() throws IOException, InterruptedException {
+  public void run()
+      throws IOException, InterruptedException {
     if (!this.workUnits.hasNext()) {
       log.warn("No work units to run in container " + containerIdOptional.or(""));
       return;
@@ -158,7 +160,8 @@ public class GobblinMultiTaskAttempt {
     log.info("All assigned tasks of job {} have completed in container {}", jobId, containerIdOptional.or(""));
   }
 
-  private void interruptTaskExecution(CountDownLatch countDownLatch) throws InterruptedException {
+  private void interruptTaskExecution(CountDownLatch countDownLatch)
+      throws InterruptedException {
     log.info("Job interrupted. Attempting a graceful shutdown of the job.");
     this.tasks.forEach(Task::shutdown);
     if (!countDownLatch.await(5, TimeUnit.SECONDS)) {
@@ -176,7 +179,8 @@ public class GobblinMultiTaskAttempt {
    * 3. persist task statestore.
    * @throws IOException
    */
-  public void commit() throws IOException {
+  public void commit()
+      throws IOException {
     if (this.tasks == null || this.tasks.isEmpty()) {
       log.warn("No tasks to be committed in container " + containerIdOptional.or(""));
       return;
@@ -188,7 +192,8 @@ public class GobblinMultiTaskAttempt {
             return new Callable<Void>() {
               @Nullable
               @Override
-              public Void call() throws Exception {
+              public Void call()
+                  throws Exception {
                 task.commit();
                 return null;
               }
@@ -199,8 +204,8 @@ public class GobblinMultiTaskAttempt {
     try {
       List<Either<Void, ExecutionException>> executionResults =
           new IteratorExecutor<>(callableIterator, this.getTaskCommitThreadPoolSize(),
-              ExecutorsUtils.newDaemonThreadFactory(Optional.of(log),
-                  Optional.of("Task-committing-pool-%d"))).executeAndGetResults();
+              ExecutorsUtils.newDaemonThreadFactory(Optional.of(log), Optional.of("Task-committing-pool-%d")))
+              .executeAndGetResults();
       IteratorExecutor.logFailures(executionResults, log, 10);
     } catch (InterruptedException ie) {
       log.error("Committing of tasks interrupted. Aborting.");
@@ -220,7 +225,8 @@ public class GobblinMultiTaskAttempt {
    * A method that shuts down all running tasks managed by this instance.
    * TODO: Call this from the right place.
    */
-  public void shutdownTasks() throws InterruptedException {
+  public void shutdownTasks()
+      throws InterruptedException {
     log.info("Shutting down tasks");
     for (Task task : this.tasks) {
       task.shutdown();
@@ -239,7 +245,8 @@ public class GobblinMultiTaskAttempt {
     }
   }
 
-  private void persistTaskStateStore() throws IOException {
+  private void persistTaskStateStore()
+      throws IOException {
     if (!this.taskStateStoreOptional.isPresent()) {
       log.info("Task state store does not exist.");
       return;
@@ -277,8 +284,8 @@ public class GobblinMultiTaskAttempt {
         // to filter out successful tasks on subsequent attempts.
         if (task.getTaskState().getWorkingState() == WorkUnitState.WorkingState.SUCCESSFUL
             || task.getTaskState().getWorkingState() == WorkUnitState.WorkingState.COMMITTED) {
-          taskStateStore.put(task.getJobId(), task.getTaskId() + TASK_STATE_STORE_SUCCESS_MARKER_SUFFIX,
-              task.getTaskState());
+          taskStateStore
+              .put(task.getJobId(), task.getTaskId() + TASK_STATE_STORE_SUCCESS_MARKER_SUFFIX, task.getTaskState());
         }
       }
 
@@ -421,8 +428,8 @@ public class GobblinMultiTaskAttempt {
 
     this.log.info("Heap Memory");
     this.log.info(String.format(format, "init", "used", "Committed", "max"));
-    this.log.info(String.format(format, heapMemory.getInit(), heapMemory.getUsed(), heapMemory.getCommitted(),
-        heapMemory.getMax()));
+    this.log.info(String
+        .format(format, heapMemory.getInit(), heapMemory.getUsed(), heapMemory.getCommitted(), heapMemory.getMax()));
 
     this.log.info("Non-heap Memory");
     this.log.info(String.format(format, "init", "used", "Committed", "max"));
@@ -432,12 +439,12 @@ public class GobblinMultiTaskAttempt {
 
   private Task createTaskRunnable(WorkUnitState workUnitState, CountDownLatch countDownLatch) {
     Optional<TaskFactory> taskFactoryOpt = TaskUtils.getTaskFactory(workUnitState);
+    final TaskContext taskContext = new TaskContext(workUnitState);
     if (taskFactoryOpt.isPresent()) {
-      return new TaskIFaceWrapper(taskFactoryOpt.get().createTask(new TaskContext(workUnitState)),
-          new TaskContext(workUnitState), countDownLatch, this.taskStateTracker);
+      return new TaskIFaceWrapper(taskFactoryOpt.get().createTask(taskContext), taskContext, countDownLatch,
+          this.taskStateTracker);
     } else {
-      return new Task(new TaskContext(workUnitState), this.taskStateTracker, this.taskExecutor,
-          Optional.of(countDownLatch));
+      return new Task(taskContext, this.taskStateTracker, this.taskExecutor, Optional.of(countDownLatch));
     }
   }