You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/08 19:52:34 UTC
[1/2] beam git commit: This closes #2954
Repository: beam
Updated Branches:
refs/heads/master e1791c3f8 -> 63c6bea3b
This closes #2954
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/63c6bea3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/63c6bea3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/63c6bea3
Branch: refs/heads/master
Commit: 63c6bea3bbdb52964ac54073326d4ad99ba7432c
Parents: e1791c3 13e51c9
Author: Thomas Groh <tg...@google.com>
Authored: Mon May 8 12:52:09 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon May 8 12:52:09 2017 -0700
----------------------------------------------------------------------
.../runners/direct/BoundedReadEvaluatorFactory.java | 13 ++++++++++++-
.../org/apache/beam/runners/direct/DirectMetrics.java | 10 +++++++++-
.../apache/beam/runners/direct/EvaluationContext.java | 3 +--
.../direct/ExecutorServiceParallelExecutor.java | 12 +++++++++++-
.../SplittableProcessElementsEvaluatorFactory.java | 11 ++++++++++-
5 files changed, 43 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
[2/2] beam git commit: Convert Executor Services to use Daemon Threads
Posted by tg...@apache.org.
Convert Executor Services to use Daemon Threads
This will cause the DirectRunner to automatically shut down when the
worker threads are shut down.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/13e51c9c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/13e51c9c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/13e51c9c
Branch: refs/heads/master
Commit: 13e51c9cd114eec73c47b71f46214dd92ee81048
Parents: e1791c3
Author: Dan Halperin <dh...@google.com>
Authored: Fri May 5 19:49:29 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon May 8 12:52:09 2017 -0700
----------------------------------------------------------------------
.../runners/direct/BoundedReadEvaluatorFactory.java | 13 ++++++++++++-
.../org/apache/beam/runners/direct/DirectMetrics.java | 10 +++++++++-
.../apache/beam/runners/direct/EvaluationContext.java | 3 +--
.../direct/ExecutorServiceParallelExecutor.java | 12 +++++++++++-
.../SplittableProcessElementsEvaluatorFactory.java | 11 ++++++++++-
5 files changed, 43 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/13e51c9c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 99a0fca..76db861 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -21,7 +21,9 @@ import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
@@ -57,7 +59,16 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
*/
private static final long REQUIRED_DYNAMIC_SPLIT_ORIGINAL_SIZE = 0;
private final EvaluationContext evaluationContext;
- @VisibleForTesting final ExecutorService executor = Executors.newCachedThreadPool();
+
+ // TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in the DirectRunner.
+ @VisibleForTesting
+ final ExecutorService executor =
+ Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder()
+ .setThreadFactory(MoreExecutors.platformThreadFactory())
+ .setDaemon(true)
+ .setNameFormat("direct-dynamic-split-requester")
+ .build());
private final long minimumDynamicSplitSize;
http://git-wip-us.apache.org/repos/asf/beam/blob/13e51c9c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
index b6ca492..b7cd6e7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
@@ -21,6 +21,8 @@ import static java.util.Arrays.asList;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Map;
import java.util.Map.Entry;
@@ -51,7 +53,13 @@ import org.apache.beam.sdk.metrics.MetricsMap;
class DirectMetrics extends MetricResults {
// TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in the DirectRunner.
- private static final ExecutorService COUNTER_COMMITTER = Executors.newCachedThreadPool();
+ private static final ExecutorService COUNTER_COMMITTER =
+ Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder()
+ .setThreadFactory(MoreExecutors.platformThreadFactory())
+ .setDaemon(true)
+ .setNameFormat("direct-metrics-counter-committer")
+ .build());
private interface MetricAggregation<UpdateT, ResultT> {
UpdateT zero();
http://git-wip-us.apache.org/repos/asf/beam/blob/13e51c9c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 362ff91..c627119 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -123,8 +123,7 @@ class EvaluationContext {
this.applicationStateInternals = new ConcurrentHashMap<>();
this.metrics = new DirectMetrics();
- this.callbackExecutor =
- WatermarkCallbackExecutor.create(MoreExecutors.directExecutor());
+ this.callbackExecutor = WatermarkCallbackExecutor.create(MoreExecutors.directExecutor());
}
public void initialize(
http://git-wip-us.apache.org/repos/asf/beam/blob/13e51c9c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 02fb11a..b7f4732 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -28,6 +28,8 @@ import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -132,7 +134,15 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements,
EvaluationContext context) {
this.targetParallelism = targetParallelism;
- this.executorService = Executors.newFixedThreadPool(targetParallelism);
+ // Don't use Daemon threads for workers. The Pipeline should continue to execute even if there
+ // are no other active threads (for example, because waitUntilFinish was not called)
+ this.executorService =
+ Executors.newFixedThreadPool(
+ targetParallelism,
+ new ThreadFactoryBuilder()
+ .setThreadFactory(MoreExecutors.platformThreadFactory())
+ .setNameFormat("direct-runner-worker")
+ .build());
this.graph = graph;
this.rootProviderRegistry = rootProviderRegistry;
this.registry = registry;
http://git-wip-us.apache.org/repos/asf/beam/blob/13e51c9c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 2797233..f490b0b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.direct;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executors;
@@ -171,7 +173,14 @@ class SplittableProcessElementsEvaluatorFactory<
outputWindowedValue,
evaluationContext.createSideInputReader(transform.getSideInputs()),
// TODO: For better performance, use a higher-level executor?
- Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
+ // TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in the
+ // DirectRunner.
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setThreadFactory(MoreExecutors.platformThreadFactory())
+ .setDaemon(true)
+ .setNameFormat("direct-splittable-process-element-checkpoint-executor")
+ .build()),
10000,
Duration.standardSeconds(10)));