You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/01/11 23:31:22 UTC
incubator-gobblin git commit: [Gobblin-359] fix task executor logging
Repository: incubator-gobblin
Updated Branches:
refs/heads/master ece2858ec -> d0784cad9
[Gobblin-359] fix task executor logging
Closes #2241 from kadaan/GOBBLIN-
359__Fix_TaskExecutor_Logging
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d0784cad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d0784cad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d0784cad
Branch: refs/heads/master
Commit: d0784cad9e809f8f1c737317b2b8096af11aea30
Parents: ece2858
Author: Joel Baranick <jo...@ensighten.com>
Authored: Thu Jan 11 15:31:08 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Jan 11 15:31:08 2018 -0800
----------------------------------------------------------------------
.../apache/gobblin/runtime/TaskExecutor.java | 34 +++++++++++---------
1 file changed, 18 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d0784cad/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
index 476282c..be78275 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
@@ -136,28 +136,30 @@ public class TaskExecutor extends AbstractIdleService {
Preconditions.checkArgument(queuedTaskTimeMaxAge > 0, "Queued task time max age should be positive");
// Currently a fixed-size thread pool is used to execute tasks. We probably need to revisit this later.
- this.taskExecutor = Executors.newScheduledThreadPool(
+ this.taskExecutor = ExecutorsUtils.loggingDecorator(Executors.newScheduledThreadPool(
taskExecutorThreadPoolSize,
- ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("TaskExecutor-%d")));
+ ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("TaskExecutor-%d"))));
this.retryIntervalInSeconds = retryIntervalInSeconds;
this.queuedTaskTimeMaxSize = queuedTaskTimeMaxSize;
this.queuedTaskTimeMaxAge = queuedTaskTimeMaxAge;
- this.forkExecutor = new ThreadPoolExecutor(
- // The core thread pool size is equal to that of the task executor as there's at least one fork per task
- taskExecutorThreadPoolSize,
- // The fork executor thread pool size is essentially unbounded. This is to make sure all forks of
- // a task get a thread to run so all forks of the task are making progress. This is necessary since
- // otherwise the parent task will be blocked if the record queue (bounded) of some fork is full and
- // that fork has not yet started to run because of no available thread. The task cannot proceed in
- // this case because it has to make sure every records go to every forks.
- Integer.MAX_VALUE,
- 0L,
- TimeUnit.MILLISECONDS,
- // The work queue is a SynchronousQueue. This essentially forces a new thread to be created for each fork.
- new SynchronousQueue<Runnable>(),
- ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("ForkExecutor-%d")));
+ this.forkExecutor = ExecutorsUtils.loggingDecorator(
+ new ThreadPoolExecutor(
+ // The core thread pool size is equal to that of the task
+ // executor as there's at least one fork per task
+ taskExecutorThreadPoolSize,
+ // The fork executor thread pool size is essentially unbounded. This is to make sure all forks of
+ // a task get a thread to run so all forks of the task are making progress. This is necessary since
+ // otherwise the parent task will be blocked if the record queue (bounded) of some fork is full and
+ // that fork has not yet started to run because of no available thread. The task cannot proceed in
+ // this case because it has to make sure every records go to every forks.
+ Integer.MAX_VALUE,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ // The work queue is a SynchronousQueue. This essentially forces a new thread to be created for each fork.
+ new SynchronousQueue<Runnable>(),
+ ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("ForkExecutor-%d"))));
}
/**