You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/05/20 07:15:31 UTC
[13/14] incubator-beam git commit: Fix Dangling Flink DataSets
Fix Dangling Flink DataSets
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/26fa0b21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/26fa0b21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/26fa0b21
Branch: refs/heads/master
Commit: 26fa0b21cfda3049e26d47ce174a9b29fe3ec29c
Parents: 1664c96
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri May 6 08:26:50 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri May 20 08:08:24 2016 +0200
----------------------------------------------------------------------
.../translation/FlinkBatchPipelineTranslator.java | 14 ++++++++++++++
.../translation/FlinkBatchTranslationContext.java | 18 +++++++++++++++++-
2 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26fa0b21/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
index 3d39e81..512b822 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink.translation;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.TransformTreeNode;
import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -24,7 +25,9 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.values.PValue;
+import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +50,17 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
this.batchContext = new FlinkBatchTranslationContext(env, options);
}
+ @Override
+ @SuppressWarnings("rawtypes, unchecked")
+ public void translate(Pipeline pipeline) {
+ super.translate(pipeline);
+
+ // terminate dangling DataSets
+ for (DataSet<?> dataSet: batchContext.getDanglingDataSets().values()) {
+ dataSet.output(new DiscardingOutputFormat());
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// Pipeline Visitor Methods
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26fa0b21/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
index 71950cf..501b1ea 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
@@ -43,6 +43,13 @@ public class FlinkBatchTranslationContext {
private final Map<PValue, DataSet<?>> dataSets;
private final Map<PCollectionView<?>, DataSet<?>> broadcastDataSets;
+ /**
+ * For keeping track about which DataSets don't have a successor. We
+ * need to terminate these with a discarding sink because the Beam
+ * model allows dangling operations.
+ */
+ private final Map<PValue, DataSet<?>> danglingDataSets;
+
private final ExecutionEnvironment env;
private final PipelineOptions options;
@@ -55,10 +62,16 @@ public class FlinkBatchTranslationContext {
this.options = options;
this.dataSets = new HashMap<>();
this.broadcastDataSets = new HashMap<>();
+
+ this.danglingDataSets = new HashMap<>();
}
// ------------------------------------------------------------------------
-
+
+ public Map<PValue, DataSet<?>> getDanglingDataSets() {
+ return danglingDataSets;
+ }
+
public ExecutionEnvironment getExecutionEnvironment() {
return env;
}
@@ -69,12 +82,15 @@ public class FlinkBatchTranslationContext {
@SuppressWarnings("unchecked")
public <T> DataSet<T> getInputDataSet(PValue value) {
+ // assume that the DataSet is used as an input if retrieved here
+ danglingDataSets.remove(value);
return (DataSet<T>) dataSets.get(value);
}
public void setOutputDataSet(PValue value, DataSet<?> set) {
if (!dataSets.containsKey(value)) {
dataSets.put(value, set);
+ danglingDataSets.put(value, set);
}
}