You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/15 21:17:08 UTC

[3/5] incubator-beam git commit: Add TransformEvaluatorFactory#cleanup

Add TransformEvaluatorFactory#cleanup

This cleans up any state stored within the Transform Evaluator Factory.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/12b19677
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/12b19677
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/12b19677

Branch: refs/heads/master
Commit: 12b19677280c11b0dca203ef266769b05c90937e
Parents: 0b1f664
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jul 15 11:27:00 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Aug 15 14:16:54 2016 -0700

----------------------------------------------------------------------
 .../direct/BoundedReadEvaluatorFactory.java     |  4 ++
 .../direct/ExecutorServiceParallelExecutor.java |  9 ++++-
 .../runners/direct/FlattenEvaluatorFactory.java |  3 ++
 .../GroupAlsoByWindowEvaluatorFactory.java      |  6 ++-
 .../direct/GroupByKeyOnlyEvaluatorFactory.java  |  4 +-
 .../direct/ParDoMultiEvaluatorFactory.java      |  5 +++
 .../direct/ParDoSingleEvaluatorFactory.java     |  5 +++
 .../direct/TransformEvaluatorFactory.java       |  8 ++++
 .../direct/TransformEvaluatorRegistry.java      | 41 ++++++++++++++++++++
 .../direct/UnboundedReadEvaluatorFactory.java   |  3 ++
 .../runners/direct/ViewEvaluatorFactory.java    |  3 ++
 .../runners/direct/WindowEvaluatorFactory.java  |  3 ++
 12 files changed, 90 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 2f4f86c..0c4b7fd 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -60,6 +60,10 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
     return getTransformEvaluator((AppliedPTransform) application, evaluationContext);
   }
 
+  @Override
+  public void cleanup() {
+  }
+
   /**
    * Get a {@link TransformEvaluator} that produces elements for the provided application of
    * {@link Bounded Read.Bounded}, initializing the queue of evaluators if required.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index a0a5ec0..8c6c6ed 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -447,13 +447,18 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
     private boolean shouldShutdown() {
       boolean shouldShutdown = exceptionThrown || evaluationContext.isDone();
       if (shouldShutdown) {
+        LOG.debug("Pipeline has terminated. Shutting down.");
+        executorService.shutdown();
+        try {
+          registry.cleanup();
+        } catch (Exception e) {
+          visibleUpdates.add(VisibleExecutorUpdate.fromThrowable(e));
+        }
         if (evaluationContext.isDone()) {
-          LOG.debug("Pipeline is finished. Shutting down. {}");
           while (!visibleUpdates.offer(VisibleExecutorUpdate.finished())) {
             visibleUpdates.poll();
           }
         }
-        executorService.shutdown();
       }
       return shouldShutdown;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
index c84f620..5a0d31d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -43,6 +43,9 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
     return evaluator;
   }
 
+  @Override
+  public void cleanup() throws Exception {}
+
   private <InputT> TransformEvaluator<InputT> createInMemoryEvaluator(
       final AppliedPTransform<
               PCollectionList<InputT>, PCollection<InputT>, FlattenPCollectionList<InputT>>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index e052226..d16ffa0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -61,11 +61,15 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
     return evaluator;
   }
 
+  @Override
+  public void cleanup() {}
+
   private <K, V> TransformEvaluator<KeyedWorkItem<K, V>> createEvaluator(
       AppliedPTransform<
               PCollection<KeyedWorkItem<K, V>>,
               PCollection<KV<K, Iterable<V>>>,
-              DirectGroupAlsoByWindow<K, V>> application,
+              DirectGroupAlsoByWindow<K, V>>
+          application,
       CommittedBundle<KeyedWorkItem<K, V>> inputBundle,
       EvaluationContext evaluationContext) {
     return new GroupAlsoByWindowEvaluator<>(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
index 0e419c3..dbdbdaf 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.direct;
 
 import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
-
 import static com.google.common.base.Preconditions.checkState;
 
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
@@ -61,6 +60,9 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
     return evaluator;
   }
 
+  @Override
+  public void cleanup() {}
+
   private <K, V> TransformEvaluator<KV<K, WindowedValue<V>>> createEvaluator(
       final AppliedPTransform<
           PCollection<KV<K, WindowedValue<V>>>,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
index ce770ca..40533c0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
@@ -70,6 +70,11 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
     return evaluator;
   }
 
+  @Override
+  public void cleanup() {
+
+  }
+
   private <InT, OuT> TransformEvaluator<InT> createMultiEvaluator(
       AppliedPTransform<PCollection<InT>, PCollectionTuple, BoundMulti<InT, OuT>> application,
       CommittedBundle<InT> inputBundle,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
index 53af6af..201fb46 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
@@ -69,6 +69,11 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
     return evaluator;
   }
 
+  @Override
+  public void cleanup() {
+
+  }
+
   private <InputT, OutputT> TransformEvaluator<InputT> createSingleEvaluator(
       AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, Bound<InputT, OutputT>>
           application,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
index d021b43..3655d26 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
@@ -51,4 +51,12 @@ public interface TransformEvaluatorFactory {
   @Nullable <InputT> TransformEvaluator<InputT> forApplication(
       AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> inputBundle,
       EvaluationContext evaluationContext) throws Exception;
+
+  /**
+   * Cleans up any state maintained by this {@link TransformEvaluatorFactory}. Called after a
+   * {@link Pipeline} is shut down. No more calls to
+   * {@link #forApplication(AppliedPTransform, CommittedBundle, EvaluationContext)} will be made
+   * after a call to {@link #cleanup()}.
+   */
+  void cleanup() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index f0afc3b..b469237 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
@@ -29,7 +31,13 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 
 import com.google.common.collect.ImmutableMap;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.Nullable;
 
@@ -38,6 +46,7 @@ import javax.annotation.Nullable;
  * implementations based on the type of {@link PTransform} of the application.
  */
 class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
+  private static final Logger LOG = LoggerFactory.getLogger(TransformEvaluatorRegistry.class);
   public static TransformEvaluatorRegistry defaultRegistry() {
     @SuppressWarnings("rawtypes")
     ImmutableMap<Class<? extends PTransform>, TransformEvaluatorFactory> primitives =
@@ -61,6 +70,8 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
   @SuppressWarnings("rawtypes")
   private final Map<Class<? extends PTransform>, TransformEvaluatorFactory> factories;
 
+  private final AtomicBoolean finished = new AtomicBoolean(false);
+
   private TransformEvaluatorRegistry(
       @SuppressWarnings("rawtypes")
       Map<Class<? extends PTransform>, TransformEvaluatorFactory> factories) {
@@ -73,7 +84,37 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
       @Nullable CommittedBundle<?> inputBundle,
       EvaluationContext evaluationContext)
       throws Exception {
+    checkState(
+        !finished.get(), "Tried to get an evaluator for a finished TransformEvaluatorRegistry");
     TransformEvaluatorFactory factory = factories.get(application.getTransform().getClass());
     return factory.forApplication(application, inputBundle, evaluationContext);
   }
+
+  @Override
+  public void cleanup() throws Exception {
+    Collection<Exception> thrownInCleanup = new ArrayList<>();
+    for (TransformEvaluatorFactory factory : factories.values()) {
+      try {
+        factory.cleanup();
+      } catch (Exception e) {
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
+        thrownInCleanup.add(e);
+      }
+    }
+    finished.set(true);
+    if (!thrownInCleanup.isEmpty()) {
+      LOG.error("Exceptions {} thrown while cleaning up evaluators", thrownInCleanup);
+      Exception toThrow = null;
+      for (Exception e : thrownInCleanup) {
+        if (toThrow == null) {
+          toThrow = e;
+        } else {
+          toThrow.addSuppressed(e);
+        }
+      }
+      throw toThrow;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 0e2745b..c4d408b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -113,6 +113,9 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
     return evaluatorQueue.poll();
   }
 
+  @Override
+  public void cleanup() {}
+
   /**
    * A {@link UnboundedReadEvaluator} produces elements from an underlying {@link UnboundedSource},
    * discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index 362e903..3b0de4b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -59,6 +59,9 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
     return evaluator;
   }
 
+  @Override
+  public void cleanup() throws Exception {}
+
   private <InT, OuT> TransformEvaluator<Iterable<InT>> createEvaluator(
       final AppliedPTransform<PCollection<Iterable<InT>>, PCollectionView<OuT>, WriteView<InT, OuT>>
           application,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12b19677/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index 67c2f17..f2e62cb 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -66,6 +66,9 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory {
     return new WindowIntoEvaluator<>(transform, fn, outputBundle);
   }
 
+  @Override
+  public void cleanup() {}
+
   private static class WindowIntoEvaluator<InputT> implements TransformEvaluator<InputT> {
     private final AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>>
         transform;