You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/01/11 23:31:22 UTC

incubator-gobblin git commit: [Gobblin-359] fix task executor logging

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master ece2858ec -> d0784cad9


[Gobblin-359] fix task executor logging

Closes #2241 from kadaan/GOBBLIN-
359__Fix_TaskExecutor_Logging


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d0784cad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d0784cad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d0784cad

Branch: refs/heads/master
Commit: d0784cad9e809f8f1c737317b2b8096af11aea30
Parents: ece2858
Author: Joel Baranick <jo...@ensighten.com>
Authored: Thu Jan 11 15:31:08 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Jan 11 15:31:08 2018 -0800

----------------------------------------------------------------------
 .../apache/gobblin/runtime/TaskExecutor.java    | 34 +++++++++++---------
 1 file changed, 18 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d0784cad/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
index 476282c..be78275 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
@@ -136,28 +136,30 @@ public class TaskExecutor extends AbstractIdleService {
     Preconditions.checkArgument(queuedTaskTimeMaxAge > 0, "Queued task time max age should be positive");
 
     // Currently a fixed-size thread pool is used to execute tasks. We probably need to revisit this later.
-    this.taskExecutor = Executors.newScheduledThreadPool(
+    this.taskExecutor = ExecutorsUtils.loggingDecorator(Executors.newScheduledThreadPool(
         taskExecutorThreadPoolSize,
-        ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("TaskExecutor-%d")));
+        ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("TaskExecutor-%d"))));
 
     this.retryIntervalInSeconds = retryIntervalInSeconds;
     this.queuedTaskTimeMaxSize = queuedTaskTimeMaxSize;
     this.queuedTaskTimeMaxAge = queuedTaskTimeMaxAge;
 
-    this.forkExecutor = new ThreadPoolExecutor(
-        // The core thread pool size is equal to that of the task executor as there's at least one fork per task
-        taskExecutorThreadPoolSize,
-        // The fork executor thread pool size is essentially unbounded. This is to make sure all forks of
-        // a task get a thread to run so all forks of the task are making progress. This is necessary since
-        // otherwise the parent task will be blocked if the record queue (bounded) of some fork is full and
-        // that fork has not yet started to run because of no available thread. The task cannot proceed in
-        // this case because it has to make sure every records go to every forks.
-        Integer.MAX_VALUE,
-        0L,
-        TimeUnit.MILLISECONDS,
-        // The work queue is a SynchronousQueue. This essentially forces a new thread to be created for each fork.
-        new SynchronousQueue<Runnable>(),
-        ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("ForkExecutor-%d")));
+    this.forkExecutor = ExecutorsUtils.loggingDecorator(
+        new ThreadPoolExecutor(
+            // The core thread pool size is equal to that of the task
+            // executor as there's at least one fork per task
+            taskExecutorThreadPoolSize,
+            // The fork executor thread pool size is essentially unbounded. This is to make sure all forks of
+            // a task get a thread to run so all forks of the task are making progress. This is necessary since
+            // otherwise the parent task will be blocked if the record queue (bounded) of some fork is full and
+            // that fork has not yet started to run because of no available thread. The task cannot proceed in
+            // this case because it has to make sure every records go to every forks.
+            Integer.MAX_VALUE,
+            0L,
+            TimeUnit.MILLISECONDS,
+            // The work queue is a SynchronousQueue. This essentially forces a new thread to be created for each fork.
+            new SynchronousQueue<Runnable>(),
+            ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("ForkExecutor-%d"))));
   }
 
   /**