You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/05/20 07:15:31 UTC

[13/14] incubator-beam git commit: Fix Dangling Flink DataSets

Fix Dangling Flink DataSets


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/26fa0b21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/26fa0b21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/26fa0b21

Branch: refs/heads/master
Commit: 26fa0b21cfda3049e26d47ce174a9b29fe3ec29c
Parents: 1664c96
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri May 6 08:26:50 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri May 20 08:08:24 2016 +0200

----------------------------------------------------------------------
 .../translation/FlinkBatchPipelineTranslator.java | 14 ++++++++++++++
 .../translation/FlinkBatchTranslationContext.java | 18 +++++++++++++++++-
 2 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26fa0b21/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
index 3d39e81..512b822 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.flink.translation;
 
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.TransformTreeNode;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -24,7 +25,9 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.join.CoGroupByKey;
 import org.apache.beam.sdk.values.PValue;
 
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +50,17 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
     this.batchContext = new FlinkBatchTranslationContext(env, options);
   }
 
+  @Override
+  @SuppressWarnings("rawtypes, unchecked")
+  public void translate(Pipeline pipeline) {
+    super.translate(pipeline);
+
+    // terminate dangling DataSets
+    for (DataSet<?> dataSet: batchContext.getDanglingDataSets().values()) {
+      dataSet.output(new DiscardingOutputFormat());
+    }
+  }
+
   // --------------------------------------------------------------------------------------------
   //  Pipeline Visitor Methods
   // --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26fa0b21/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
index 71950cf..501b1ea 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
@@ -43,6 +43,13 @@ public class FlinkBatchTranslationContext {
   private final Map<PValue, DataSet<?>> dataSets;
   private final Map<PCollectionView<?>, DataSet<?>> broadcastDataSets;
 
+  /**
+   * For keeping track about which DataSets don't have a successor. We
+   * need to terminate these with a discarding sink because the Beam
+   * model allows dangling operations.
+   */
+  private final Map<PValue, DataSet<?>> danglingDataSets;
+
   private final ExecutionEnvironment env;
   private final PipelineOptions options;
 
@@ -55,10 +62,16 @@ public class FlinkBatchTranslationContext {
     this.options = options;
     this.dataSets = new HashMap<>();
     this.broadcastDataSets = new HashMap<>();
+
+    this.danglingDataSets = new HashMap<>();
   }
   
   // ------------------------------------------------------------------------
-  
+
+  public Map<PValue, DataSet<?>> getDanglingDataSets() {
+    return danglingDataSets;
+  }
+
   public ExecutionEnvironment getExecutionEnvironment() {
     return env;
   }
@@ -69,12 +82,15 @@ public class FlinkBatchTranslationContext {
   
   @SuppressWarnings("unchecked")
   public <T> DataSet<T> getInputDataSet(PValue value) {
+    // assume that the DataSet is used as an input if retrieved here
+    danglingDataSets.remove(value);
     return (DataSet<T>) dataSets.get(value);
   }
 
   public void setOutputDataSet(PValue value, DataSet<?> set) {
     if (!dataSets.containsKey(value)) {
       dataSets.put(value, set);
+      danglingDataSets.put(value, set);
     }
   }