You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/13 00:40:39 UTC

[08/50] [abbrv] incubator-beam git commit: Address comments of Flink Side-Input PR

Address comments of Flink Side-Input PR


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6ae4b6a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6ae4b6a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6ae4b6a3

Branch: refs/heads/gearpump-runner
Commit: 6ae4b6a3df5cf3b834505fcb3f21df0e90473a0f
Parents: 8007bdf
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Aug 25 11:00:39 2016 +0200
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Sep 12 17:40:11 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/core/SideInputHandler.java     |  6 +-
 .../apache/beam/runners/flink/FlinkRunner.java  | 86 ++++++++++++++++++--
 .../wrappers/streaming/DoFnOperator.java        | 13 ++-
 .../wrappers/streaming/WindowDoFnOperator.java  |  2 -
 4 files changed, 89 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ae4b6a3/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
index a97d3f3..851ed37 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -60,7 +60,11 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
   /** The list of side inputs that we're handling. */
   protected final Collection<PCollectionView<?>> sideInputs;
 
-  /** State internals that are scoped not to the key of a value but instead to one key group. */
+  /**
+   * State internals that are scoped not to the key of a value but are global. The state can still
+   * be keep locally but if side inputs are broadcast to all parallel operators then all will
+   * have the same view of the state.
+   */
   private final StateInternals<Void> stateInternals;
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ae4b6a3/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 8b1f42e..d3c65c0 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -25,8 +25,13 @@ import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
@@ -35,6 +40,7 @@ import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.TransformTreeNode;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -47,6 +53,8 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+
 import org.apache.flink.api.common.JobExecutionResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -108,6 +116,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
 
   private FlinkRunner(FlinkPipelineOptions options) {
     this.options = options;
+    this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>();
 
     ImmutableMap.Builder<Class<?>, Class<?>> builder = ImmutableMap.<Class<?>, Class<?>>builder();
     if (options.isStreaming()) {
@@ -124,6 +133,8 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
 
   @Override
   public FlinkRunnerResult run(Pipeline pipeline) {
+    logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
+
     LOG.info("Executing pipeline using FlinkRunner.");
 
     FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options);
@@ -176,6 +187,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
 
       PTransform<InputT, OutputT> customTransform =
           InstanceBuilder.ofType(customTransformClass)
+              .withArg(FlinkRunner.class, this)
               .withArg(transformClass, transform)
               .build();
 
@@ -223,6 +235,59 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
     return files;
   }
 
+  /** A set of {@link View}s with non-deterministic key coders. */
+  Set<PTransform<?, ?>> ptransformViewsWithNonDeterministicKeyCoders;
+
+  /**
+   * Records that the {@link PTransform} requires a deterministic key coder.
+   */
+  private void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> ptransform) {
+    ptransformViewsWithNonDeterministicKeyCoders.add(ptransform);
+  }
+
+  /** Outputs a warning about PCollection views without deterministic key coders. */
+  private void logWarningIfPCollectionViewHasNonDeterministicKeyCoder(Pipeline pipeline) {
+    // We need to wait till this point to determine the names of the transforms since only
+    // at this time do we know the hierarchy of the transforms otherwise we could
+    // have just recorded the full names during apply time.
+    if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) {
+      final SortedSet<String> ptransformViewNamesWithNonDeterministicKeyCoders = new TreeSet<>();
+      pipeline.traverseTopologically(new Pipeline.PipelineVisitor() {
+        @Override
+        public void visitValue(PValue value, TransformTreeNode producer) {
+        }
+
+        @Override
+        public void visitPrimitiveTransform(TransformTreeNode node) {
+          if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
+            ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
+          }
+        }
+
+        @Override
+        public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+          if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
+            ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
+          }
+          return CompositeBehavior.ENTER_TRANSFORM;
+        }
+
+        @Override
+        public void leaveCompositeTransform(TransformTreeNode node) {
+        }
+      });
+
+      LOG.warn("Unable to use indexed implementation for View.AsMap and View.AsMultimap for {} "
+          + "because the key coder is not deterministic. Falling back to singleton implementation "
+          + "which may cause memory and/or performance problems. Future major versions of "
+          + "the Flink runner will require deterministic key coders.",
+          ptransformViewNamesWithNonDeterministicKeyCoders);
+    }
+  }
+
+
+  /////////////////////////////////////////////////////////////////////////////
+
   /**
    * Specialized implementation for
    * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
@@ -231,8 +296,11 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
   private static class StreamingViewAsMap<K, V>
       extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
 
+    private final FlinkRunner runner;
+
     @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingViewAsMap(View.AsMap<K, V> transform) {
+    public StreamingViewAsMap(FlinkRunner runner, View.AsMap<K, V> transform) {
+      this.runner = runner;
     }
 
     @Override
@@ -248,7 +316,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
       try {
         inputCoder.getKeyCoder().verifyDeterministic();
       } catch (Coder.NonDeterministicException e) {
-//        runner.recordViewUsesNonDeterministicKeyCoder(this);
+        runner.recordViewUsesNonDeterministicKeyCoder(this);
       }
 
       return input
@@ -270,11 +338,14 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
   private static class StreamingViewAsMultimap<K, V>
       extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
 
+    private final FlinkRunner runner;
+
     /**
      * Builds an instance of this class from the overridden transform.
      */
     @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingViewAsMultimap(View.AsMultimap<K, V> transform) {
+    public StreamingViewAsMultimap(FlinkRunner runner, View.AsMultimap<K, V> transform) {
+      this.runner = runner;
     }
 
     @Override
@@ -290,7 +361,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
       try {
         inputCoder.getKeyCoder().verifyDeterministic();
       } catch (Coder.NonDeterministicException e) {
-//        runner.recordViewUsesNonDeterministicKeyCoder(this);
+        runner.recordViewUsesNonDeterministicKeyCoder(this);
       }
 
       return input
@@ -315,7 +386,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
      * Builds an instance of this class from the overridden transform.
      */
     @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingViewAsList(View.AsList<T> transform) {}
+    public StreamingViewAsList(FlinkRunner runner, View.AsList<T> transform) {}
 
     @Override
     public PCollectionView<List<T>> apply(PCollection<T> input) {
@@ -346,7 +417,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
      * Builds an instance of this class from the overridden transform.
      */
     @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingViewAsIterable(View.AsIterable<T> transform) { }
+    public StreamingViewAsIterable(FlinkRunner runner, View.AsIterable<T> transform) { }
 
     @Override
     public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
@@ -386,7 +457,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
      * Builds an instance of this class from the overridden transform.
      */
     @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingViewAsSingleton(View.AsSingleton<T> transform) {
+    public StreamingViewAsSingleton(FlinkRunner runner, View.AsSingleton<T> transform) {
       this.transform = transform;
     }
 
@@ -443,6 +514,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
      */
     @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
     public StreamingCombineGloballyAsSingletonView(
+        FlinkRunner runner,
         Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
       this.transform = transform;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ae4b6a3/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 000d69f..2c7ebc6 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -75,11 +75,12 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 
 /**
- * Flink operator for executing {@link DoFn DoFns}.
+ * Flink operator for executing {@link OldDoFn DoFns}.
  *
- * @param <InputT>
- * @param <FnOutputT>
- * @param <OutputT>
+ * @param <InputT> the input type of the {@link OldDoFn}
+ * @param <FnOutputT> the output type of the {@link OldDoFn}
+ * @param <OutputT> the output type of the operator, this can be different from the fn output type when we have
+ *                 side outputs
  */
 public class DoFnOperator<InputT, FnOutputT, OutputT>
     extends AbstractStreamOperator<OutputT>
@@ -95,8 +96,6 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
   protected final Collection<PCollectionView<?>> sideInputs;
   protected final Map<Integer, PCollectionView<?>> sideInputTagMapping;
 
-  protected final boolean hasSideInputs;
-
   protected final WindowingStrategy<?, ?> windowingStrategy;
 
   protected final OutputManagerFactory<OutputT> outputManagerFactory;
@@ -136,8 +135,6 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     this.windowingStrategy = windowingStrategy;
     this.outputManagerFactory = outputManagerFactory;
 
-    this.hasSideInputs = !sideInputs.isEmpty();
-
     this.pushedBackWatermarkDescriptor =
         new ReducingStateDescriptor<>(
             "pushed-back-elements-watermark-hold",

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ae4b6a3/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index c6dde51..01cfa5b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -201,8 +201,6 @@ public class WindowDoFnOperator<K, InputT, OutputT>
       if (timer != null && timer.f1.getTimestamp().getMillis() < actualInputWatermark) {
         fire = true;
 
-        System.out.println("FIRING: " + timer);
-
         watermarkTimersQueue.remove();
         watermarkTimers.remove(timer);