You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/15 21:17:08 UTC
[3/5] incubator-beam git commit: Add TransformEvaluatorFactory#cleanup
Add TransformEvaluatorFactory#cleanup
This cleans up any state stored within the Transform Evaluator Factory.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/12b19677
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/12b19677
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/12b19677
Branch: refs/heads/master
Commit: 12b19677280c11b0dca203ef266769b05c90937e
Parents: 0b1f664
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jul 15 11:27:00 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Aug 15 14:16:54 2016 -0700
----------------------------------------------------------------------
.../direct/BoundedReadEvaluatorFactory.java | 4 ++
.../direct/ExecutorServiceParallelExecutor.java | 9 ++++-
.../runners/direct/FlattenEvaluatorFactory.java | 3 ++
.../GroupAlsoByWindowEvaluatorFactory.java | 6 ++-
.../direct/GroupByKeyOnlyEvaluatorFactory.java | 4 +-
.../direct/ParDoMultiEvaluatorFactory.java | 5 +++
.../direct/ParDoSingleEvaluatorFactory.java | 5 +++
.../direct/TransformEvaluatorFactory.java | 8 ++++
.../direct/TransformEvaluatorRegistry.java | 41 ++++++++++++++++++++
.../direct/UnboundedReadEvaluatorFactory.java | 3 ++
.../runners/direct/ViewEvaluatorFactory.java | 3 ++
.../runners/direct/WindowEvaluatorFactory.java | 3 ++
12 files changed, 90 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 2f4f86c..0c4b7fd 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -60,6 +60,10 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
return getTransformEvaluator((AppliedPTransform) application, evaluationContext);
}
+ @Override
+ public void cleanup() {
+ }
+
/**
* Get a {@link TransformEvaluator} that produces elements for the provided application of
* {@link Bounded Read.Bounded}, initializing the queue of evaluators if required.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index a0a5ec0..8c6c6ed 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -447,13 +447,18 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
private boolean shouldShutdown() {
boolean shouldShutdown = exceptionThrown || evaluationContext.isDone();
if (shouldShutdown) {
+ LOG.debug("Pipeline has terminated. Shutting down.");
+ executorService.shutdown();
+ try {
+ registry.cleanup();
+ } catch (Exception e) {
+ visibleUpdates.add(VisibleExecutorUpdate.fromThrowable(e));
+ }
if (evaluationContext.isDone()) {
- LOG.debug("Pipeline is finished. Shutting down. {}");
while (!visibleUpdates.offer(VisibleExecutorUpdate.finished())) {
visibleUpdates.poll();
}
}
- executorService.shutdown();
}
return shouldShutdown;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
index c84f620..5a0d31d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -43,6 +43,9 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
return evaluator;
}
+ @Override
+ public void cleanup() throws Exception {}
+
private <InputT> TransformEvaluator<InputT> createInMemoryEvaluator(
final AppliedPTransform<
PCollectionList<InputT>, PCollection<InputT>, FlattenPCollectionList<InputT>>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index e052226..d16ffa0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -61,11 +61,15 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
return evaluator;
}
+ @Override
+ public void cleanup() {}
+
private <K, V> TransformEvaluator<KeyedWorkItem<K, V>> createEvaluator(
AppliedPTransform<
PCollection<KeyedWorkItem<K, V>>,
PCollection<KV<K, Iterable<V>>>,
- DirectGroupAlsoByWindow<K, V>> application,
+ DirectGroupAlsoByWindow<K, V>>
+ application,
CommittedBundle<KeyedWorkItem<K, V>> inputBundle,
EvaluationContext evaluationContext) {
return new GroupAlsoByWindowEvaluator<>(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
index 0e419c3..dbdbdaf 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
@@ -18,7 +18,6 @@
package org.apache.beam.runners.direct;
import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
-
import static com.google.common.base.Preconditions.checkState;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
@@ -61,6 +60,9 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
return evaluator;
}
+ @Override
+ public void cleanup() {}
+
private <K, V> TransformEvaluator<KV<K, WindowedValue<V>>> createEvaluator(
final AppliedPTransform<
PCollection<KV<K, WindowedValue<V>>>,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
index ce770ca..40533c0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
@@ -70,6 +70,11 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
return evaluator;
}
+ @Override
+ public void cleanup() {
+
+ }
+
private <InT, OuT> TransformEvaluator<InT> createMultiEvaluator(
AppliedPTransform<PCollection<InT>, PCollectionTuple, BoundMulti<InT, OuT>> application,
CommittedBundle<InT> inputBundle,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
index 53af6af..201fb46 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
@@ -69,6 +69,11 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
return evaluator;
}
+ @Override
+ public void cleanup() {
+
+ }
+
private <InputT, OutputT> TransformEvaluator<InputT> createSingleEvaluator(
AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, Bound<InputT, OutputT>>
application,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
index d021b43..3655d26 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
@@ -51,4 +51,12 @@ public interface TransformEvaluatorFactory {
@Nullable <InputT> TransformEvaluator<InputT> forApplication(
AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> inputBundle,
EvaluationContext evaluationContext) throws Exception;
+
+ /**
+ * Cleans up any state maintained by this {@link TransformEvaluatorFactory}. Called after a
+ * {@link Pipeline} is shut down. No more calls to
+ * {@link #forApplication(AppliedPTransform, CommittedBundle, EvaluationContext)} will be made
+ * after a call to {@link #cleanup()}.
+ */
+ void cleanup() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index f0afc3b..b469237 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.direct;
+import static com.google.common.base.Preconditions.checkState;
+
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
@@ -29,7 +31,13 @@ import org.apache.beam.sdk.transforms.windowing.Window;
import com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
@@ -38,6 +46,7 @@ import javax.annotation.Nullable;
* implementations based on the type of {@link PTransform} of the application.
*/
class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(TransformEvaluatorRegistry.class);
public static TransformEvaluatorRegistry defaultRegistry() {
@SuppressWarnings("rawtypes")
ImmutableMap<Class<? extends PTransform>, TransformEvaluatorFactory> primitives =
@@ -61,6 +70,8 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
@SuppressWarnings("rawtypes")
private final Map<Class<? extends PTransform>, TransformEvaluatorFactory> factories;
+ private final AtomicBoolean finished = new AtomicBoolean(false);
+
private TransformEvaluatorRegistry(
@SuppressWarnings("rawtypes")
Map<Class<? extends PTransform>, TransformEvaluatorFactory> factories) {
@@ -73,7 +84,37 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
@Nullable CommittedBundle<?> inputBundle,
EvaluationContext evaluationContext)
throws Exception {
+ checkState(
+ !finished.get(), "Tried to get an evaluator for a finished TransformEvaluatorRegistry");
TransformEvaluatorFactory factory = factories.get(application.getTransform().getClass());
return factory.forApplication(application, inputBundle, evaluationContext);
}
+
+ @Override
+ public void cleanup() throws Exception {
+ Collection<Exception> thrownInCleanup = new ArrayList<>();
+ for (TransformEvaluatorFactory factory : factories.values()) {
+ try {
+ factory.cleanup();
+ } catch (Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ thrownInCleanup.add(e);
+ }
+ }
+ finished.set(true);
+ if (!thrownInCleanup.isEmpty()) {
+ LOG.error("Exceptions {} thrown while cleaning up evaluators", thrownInCleanup);
+ Exception toThrow = null;
+ for (Exception e : thrownInCleanup) {
+ if (toThrow == null) {
+ toThrow = e;
+ } else {
+ toThrow.addSuppressed(e);
+ }
+ }
+ throw toThrow;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 0e2745b..c4d408b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -113,6 +113,9 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
return evaluatorQueue.poll();
}
+ @Override
+ public void cleanup() {}
+
/**
* A {@link UnboundedReadEvaluator} produces elements from an underlying {@link UnboundedSource},
* discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index 362e903..3b0de4b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -59,6 +59,9 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
return evaluator;
}
+ @Override
+ public void cleanup() throws Exception {}
+
private <InT, OuT> TransformEvaluator<Iterable<InT>> createEvaluator(
final AppliedPTransform<PCollection<Iterable<InT>>, PCollectionView<OuT>, WriteView<InT, OuT>>
application,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index 67c2f17..f2e62cb 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -66,6 +66,9 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory {
return new WindowIntoEvaluator<>(transform, fn, outputBundle);
}
+ @Override
+ public void cleanup() {}
+
private static class WindowIntoEvaluator<InputT> implements TransformEvaluator<InputT> {
private final AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>>
transform;