You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/05/20 07:15:23 UTC
[05/14] incubator-beam git commit: [BEAM-270] Support
Timestamps/Windows in Flink Batch
[BEAM-270] Support Timestamps/Windows in Flink Batch
With this change we always use WindowedValue<T> for the underlying Flink
DataSets instead of just T. This allows us to support windowing as well.
This changes also a lot of other stuff enabled by the above:
- Use WindowedValue throughout
- Add proper translation for Window.into()
- Make side inputs window aware
- Make GroupByKey and Combine transformations window aware, this
includes support for merging windows. GroupByKey is implemented as a
Combine with a concatenating CombineFn, for simplicity
This removes Flink specific transformations for things that are handled
by builtin sources/sinks, among other things this:
- Removes special translation for AvroIO.Read/Write and
TextIO.Read/Write
- Removes special support for Write.Bound, this was not working properly
and is now handled by the Beam machinery that uses DoFns for this
- Removes special translation for binary Co-Group, the code was still
in there but was never used
- Removes ConsoleIO, this can be done using a DoFn
With this change all RunnableOnService tests run on Flink Batch.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/24bfca23
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/24bfca23
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/24bfca23
Branch: refs/heads/master
Commit: 24bfca230d5db3cb75dd0e30093a10f7523c1238
Parents: 4e60a49
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue May 10 13:53:03 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri May 20 08:08:24 2016 +0200
----------------------------------------------------------------------
runners/flink/runner/pom.xml | 10 -
.../apache/beam/runners/flink/io/ConsoleIO.java | 82 --
.../FlinkBatchPipelineTranslator.java | 4 +-
.../FlinkBatchTransformTranslators.java | 846 ++++++++++++-------
.../FlinkBatchTranslationContext.java | 56 +-
.../FlinkStreamingTransformTranslators.java | 22 +-
.../FlinkStreamingTranslationContext.java | 29 +-
.../functions/FlinkAssignContext.java | 56 ++
.../functions/FlinkAssignWindows.java | 51 ++
.../FlinkCoGroupKeyedListAggregator.java | 61 --
.../functions/FlinkCreateFunction.java | 63 --
.../functions/FlinkDoFnFunction.java | 194 ++---
.../FlinkKeyedListAggregationFunction.java | 78 --
.../FlinkMergingNonShuffleReduceFunction.java | 238 ++++++
.../FlinkMergingPartialReduceFunction.java | 205 +++++
.../functions/FlinkMergingReduceFunction.java | 207 +++++
.../functions/FlinkMultiOutputDoFnFunction.java | 157 ++--
.../FlinkMultiOutputProcessContext.java | 176 ++++
.../FlinkMultiOutputPruningFunction.java | 25 +-
.../functions/FlinkNoElementAssignContext.java | 71 ++
.../functions/FlinkPartialReduceFunction.java | 171 +++-
.../functions/FlinkProcessContext.java | 324 +++++++
.../functions/FlinkReduceFunction.java | 174 +++-
.../functions/SideInputInitializer.java | 75 ++
.../flink/translation/functions/UnionCoder.java | 152 ----
.../translation/types/CoderTypeInformation.java | 21 +-
.../translation/types/CoderTypeSerializer.java | 14 +-
.../translation/types/KvCoderComperator.java | 102 ++-
.../types/KvCoderTypeInformation.java | 63 +-
.../types/VoidCoderTypeSerializer.java | 112 ---
.../wrappers/CombineFnAggregatorWrapper.java | 94 ---
.../SerializableFnAggregatorWrapper.java | 31 +-
.../translation/wrappers/SinkOutputFormat.java | 10 +-
.../translation/wrappers/SourceInputFormat.java | 18 +-
.../streaming/FlinkGroupByKeyWrapper.java | 10 +-
.../io/FlinkStreamingCreateFunction.java | 9 +-
.../apache/beam/runners/flink/AvroITCase.java | 129 ---
.../beam/runners/flink/FlattenizeITCase.java | 76 --
.../beam/runners/flink/JoinExamplesITCase.java | 102 ---
.../runners/flink/MaybeEmptyTestITCase.java | 66 --
.../runners/flink/ParDoMultiOutputITCase.java | 102 ---
.../beam/runners/flink/ReadSourceITCase.java | 14 +-
.../flink/RemoveDuplicatesEmptyITCase.java | 72 --
.../runners/flink/RemoveDuplicatesITCase.java | 73 --
.../beam/runners/flink/SideInputITCase.java | 70 --
.../apache/beam/runners/flink/TfIdfITCase.java | 80 --
.../beam/runners/flink/WordCountITCase.java | 77 --
.../runners/flink/WordCountJoin2ITCase.java | 140 ---
.../runners/flink/WordCountJoin3ITCase.java | 158 ----
.../flink/streaming/GroupAlsoByWindowTest.java | 3 +-
.../beam/runners/flink/util/JoinExamples.java | 161 ----
.../beam/sdk/transforms/join/UnionCoder.java | 2 +-
52 files changed, 2605 insertions(+), 2731 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index fda27a8..b29a5bf 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -191,16 +191,6 @@
]
</beamTestPipelineOptions>
</systemPropertyVariables>
- <excludes>
- <!-- Tests that use unsupported windowing -->
- <exclude>**/org/apache/beam/sdk/transforms/CombineTest.java</exclude>
- <exclude>**/org/apache/beam/sdk/transforms/GroupByKeyTest.java</exclude>
- <exclude>**/org/apache/beam/sdk/transforms/ViewTest.java</exclude>
- <exclude>**/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java</exclude>
- <exclude>**/org/apache/beam/sdk/transforms/windowing/WindowTest.java</exclude>
- <exclude>**/org/apache/beam/sdk/transforms/windowing/WindowingTest.java</exclude>
- <exclude>**/org/apache/beam/sdk/util/ReshuffleTest.java</exclude>
- </excludes>
</configuration>
</execution>
<execution>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java
deleted file mode 100644
index 9c36c21..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.io;
-
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-/**
- * Transform for printing the contents of a {@link org.apache.beam.sdk.values.PCollection}.
- * to standard output.
- *
- * This is Flink-specific and will only work when executed using the
- * {@link org.apache.beam.runners.flink.FlinkPipelineRunner}.
- */
-public class ConsoleIO {
-
- /**
- * A PTransform that writes a PCollection to a standard output.
- */
- public static class Write {
-
- /**
- * Returns a ConsoleIO.Write PTransform with a default step name.
- */
- public static Bound create() {
- return new Bound();
- }
-
- /**
- * Returns a ConsoleIO.Write PTransform with the given step name.
- */
- public static Bound named(String name) {
- return new Bound().named(name);
- }
-
- /**
- * A PTransform that writes a bounded PCollection to standard output.
- */
- public static class Bound extends PTransform<PCollection<?>, PDone> {
- private static final long serialVersionUID = 0;
-
- Bound() {
- super("ConsoleIO.Write");
- }
-
- Bound(String name) {
- super(name);
- }
-
- /**
- * Returns a new ConsoleIO.Write PTransform that's like this one but with the given
- * step
- * name. Does not modify this object.
- */
- public Bound named(String name) {
- return new Bound(name);
- }
-
- @Override
- public PDone apply(PCollection<?> input) {
- return PDone.in(input.getPipeline());
- }
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
index 512b822..69c02a2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
@@ -32,8 +32,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * FlinkBatchPipelineTranslator knows how to translate Pipeline objects into Flink Jobs.
- * This is based on {@link org.apache.beam.runners.dataflow.DataflowPipelineTranslator}
+ * {@link Pipeline.PipelineVisitor} for executing a {@link Pipeline} as a
+ * Flink batch job.
*/
public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index 07785aa..8358807 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -17,23 +17,24 @@
*/
package org.apache.beam.runners.flink.translation;
-import org.apache.beam.runners.flink.io.ConsoleIO;
-import org.apache.beam.runners.flink.translation.functions.FlinkCoGroupKeyedListAggregator;
-import org.apache.beam.runners.flink.translation.functions.FlinkCreateFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkKeyedListAggregationFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkMergingPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkMergingReduceFunction;
import org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputDoFnFunction;
import org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction;
import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
-import org.apache.beam.runners.flink.translation.functions.UnionCoder;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.types.KvCoderTypeInformation;
import org.apache.beam.runners.flink.translation.wrappers.SinkOutputFormat;
import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.BoundedSource;
@@ -41,60 +42,63 @@ import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGbkResultSchema;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
-import com.google.api.client.util.Maps;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.io.AvroInputFormat;
import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.api.java.io.TextInputFormat;
-import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.GroupCombineOperator;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.Grouping;
+import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.MapPartitionOperator;
+import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
- * Translators for transforming
- * Dataflow {@link org.apache.beam.sdk.transforms.PTransform}s to
- * Flink {@link org.apache.flink.api.java.DataSet}s.
+ * Translators for transforming {@link PTransform PTransforms} to
+ * Flink {@link DataSet DataSets}.
*/
public class FlinkBatchTransformTranslators {
@@ -103,113 +107,90 @@ public class FlinkBatchTransformTranslators {
// --------------------------------------------------------------------------------------------
@SuppressWarnings("rawtypes")
- private static final Map<Class<? extends PTransform>, FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<>();
+ private static final Map<
+ Class<? extends PTransform>,
+ FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<>();
- // register the known translators
static {
TRANSLATORS.put(View.CreatePCollectionView.class, new CreatePCollectionViewTranslatorBatch());
TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch());
- // we don't need this because we translate the Combine.PerKey directly
- //TRANSLATORS.put(Combine.GroupedValues.class, new CombineGroupedValuesTranslator());
-
- TRANSLATORS.put(Create.Values.class, new CreateTranslatorBatch());
+ TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslatorBatch());
- // TODO we're currently ignoring windows here but that has to change in the future
- TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
+ TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslatorBatch());
- TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiTranslatorBatch());
TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundTranslatorBatch());
-
- TRANSLATORS.put(CoGroupByKey.class, new CoGroupByKeyTranslatorBatch());
-
- TRANSLATORS.put(AvroIO.Read.Bound.class, new AvroIOReadTranslatorBatch());
- TRANSLATORS.put(AvroIO.Write.Bound.class, new AvroIOWriteTranslatorBatch());
+ TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiTranslatorBatch());
TRANSLATORS.put(Read.Bounded.class, new ReadSourceTranslatorBatch());
- TRANSLATORS.put(Write.Bound.class, new WriteSinkTranslatorBatch());
-
- TRANSLATORS.put(TextIO.Read.Bound.class, new TextIOReadTranslatorBatch());
- TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteTranslatorBatch());
-
- // Flink-specific
- TRANSLATORS.put(ConsoleIO.Write.Bound.class, new ConsoleIOWriteTranslatorBatch());
-
}
- public static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator(PTransform<?, ?> transform) {
+ public static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator(
+ PTransform<?, ?> transform) {
return TRANSLATORS.get(transform.getClass());
}
- private static class ReadSourceTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Read.Bounded<T>> {
+ private static class ReadSourceTranslatorBatch<T>
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Read.Bounded<T>> {
@Override
public void translateNode(Read.Bounded<T> transform, FlinkBatchTranslationContext context) {
String name = transform.getName();
BoundedSource<T> source = transform.getSource();
PCollection<T> output = context.getOutput(transform);
- Coder<T> coder = output.getCoder();
- TypeInformation<T> typeInformation = context.getTypeInfo(output);
+ TypeInformation<WindowedValue<T>> typeInformation = context.getTypeInfo(output);
- DataSource<T> dataSource = new DataSource<>(context.getExecutionEnvironment(),
- new SourceInputFormat<>(source, context.getPipelineOptions()), typeInformation, name);
+ DataSource<WindowedValue<T>> dataSource = new DataSource<>(
+ context.getExecutionEnvironment(),
+ new SourceInputFormat<>(source, context.getPipelineOptions()),
+ typeInformation,
+ name);
context.setOutputDataSet(output, dataSource);
}
}
- private static class AvroIOReadTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Read.Bound<T>> {
- private static final Logger LOG = LoggerFactory.getLogger(AvroIOReadTranslatorBatch.class);
+ private static class WriteSinkTranslatorBatch<T>
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Write.Bound<T>> {
@Override
- public void translateNode(AvroIO.Read.Bound<T> transform, FlinkBatchTranslationContext context) {
- String path = transform.getFilepattern();
+ public void translateNode(Write.Bound<T> transform, FlinkBatchTranslationContext context) {
String name = transform.getName();
-// Schema schema = transform.getSchema();
- PValue output = context.getOutput(transform);
-
- TypeInformation<T> typeInformation = context.getTypeInfo(output);
-
- // This is super hacky, but unfortunately we cannot get the type otherwise
- Class<T> extractedAvroType;
- try {
- Field typeField = transform.getClass().getDeclaredField("type");
- typeField.setAccessible(true);
- @SuppressWarnings("unchecked")
- Class<T> avroType = (Class<T>) typeField.get(transform);
- extractedAvroType = avroType;
- } catch (NoSuchFieldException | IllegalAccessException e) {
- // we know that the field is there and it is accessible
- throw new RuntimeException("Could not access type from AvroIO.Bound", e);
- }
-
- DataSource<T> source = new DataSource<>(context.getExecutionEnvironment(),
- new AvroInputFormat<>(new Path(path), extractedAvroType),
- typeInformation, name);
+ PValue input = context.getInput(transform);
+ DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(input);
- context.setOutputDataSet(output, source);
+ inputDataSet.output(new SinkOutputFormat<>(transform, context.getPipelineOptions()))
+ .name(name);
}
}
- private static class AvroIOWriteTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Write.Bound<T>> {
+ private static class AvroIOWriteTranslatorBatch<T> implements
+ FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Write.Bound<T>> {
private static final Logger LOG = LoggerFactory.getLogger(AvroIOWriteTranslatorBatch.class);
+
@Override
- public void translateNode(AvroIO.Write.Bound<T> transform, FlinkBatchTranslationContext context) {
- DataSet<T> inputDataSet = context.getInputDataSet(context.getInput(transform));
+ public void translateNode(
+ AvroIO.Write.Bound<T> transform,
+ FlinkBatchTranslationContext context) {
+ DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(context.getInput(transform));
+
String filenamePrefix = transform.getFilenamePrefix();
String filenameSuffix = transform.getFilenameSuffix();
int numShards = transform.getNumShards();
String shardNameTemplate = transform.getShardNameTemplate();
// TODO: Implement these. We need Flink support for this.
- LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.",
+ LOG.warn(
+ "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.",
filenameSuffix);
- LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", shardNameTemplate);
+ LOG.warn(
+ "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.",
+ shardNameTemplate);
// This is super hacky, but unfortunately we cannot get the type otherwise
Class<T> extractedAvroType;
@@ -224,8 +205,17 @@ public class FlinkBatchTransformTranslators {
throw new RuntimeException("Could not access type from AvroIO.Bound", e);
}
- DataSink<T> dataSink = inputDataSet.output(new AvroOutputFormat<>(new Path
- (filenamePrefix), extractedAvroType));
+ MapOperator<WindowedValue<T>, T> valueStream = inputDataSet.map(
+ new MapFunction<WindowedValue<T>, T>() {
+ @Override
+ public T map(WindowedValue<T> value) throws Exception {
+ return value.getValue();
+ }
+ }).returns(new CoderTypeInformation<>(context.getInput(transform).getCoder()));
+
+
+ DataSink<T> dataSink = valueStream.output(
+ new AvroOutputFormat<>(new Path(filenamePrefix), extractedAvroType));
if (numShards > 0) {
dataSink.setParallelism(numShards);
@@ -233,37 +223,16 @@ public class FlinkBatchTransformTranslators {
}
}
- private static class TextIOReadTranslatorBatch implements FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Read.Bound<String>> {
- private static final Logger LOG = LoggerFactory.getLogger(TextIOReadTranslatorBatch.class);
-
- @Override
- public void translateNode(TextIO.Read.Bound<String> transform, FlinkBatchTranslationContext context) {
- String path = transform.getFilepattern();
- String name = transform.getName();
-
- TextIO.CompressionType compressionType = transform.getCompressionType();
- boolean needsValidation = transform.needsValidation();
-
- // TODO: Implement these. We need Flink support for this.
- LOG.warn("Translation of TextIO.CompressionType not yet supported. Is: {}.", compressionType);
- LOG.warn("Translation of TextIO.Read.needsValidation not yet supported. Is: {}.", needsValidation);
-
- PValue output = context.getOutput(transform);
-
- TypeInformation<String> typeInformation = context.getTypeInfo(output);
- DataSource<String> source = new DataSource<>(context.getExecutionEnvironment(), new TextInputFormat(new Path(path)), typeInformation, name);
-
- context.setOutputDataSet(output, source);
- }
- }
-
- private static class TextIOWriteTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Write.Bound<T>> {
+ private static class TextIOWriteTranslatorBatch<T>
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Write.Bound<T>> {
private static final Logger LOG = LoggerFactory.getLogger(TextIOWriteTranslatorBatch.class);
@Override
- public void translateNode(TextIO.Write.Bound<T> transform, FlinkBatchTranslationContext context) {
+ public void translateNode(
+ TextIO.Write.Bound<T> transform,
+ FlinkBatchTranslationContext context) {
PValue input = context.getInput(transform);
- DataSet<T> inputDataSet = context.getInputDataSet(input);
+ DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(input);
String filenamePrefix = transform.getFilenamePrefix();
String filenameSuffix = transform.getFilenameSuffix();
@@ -272,12 +241,25 @@ public class FlinkBatchTransformTranslators {
String shardNameTemplate = transform.getShardNameTemplate();
// TODO: Implement these. We need Flink support for this.
- LOG.warn("Translation of TextIO.Write.needsValidation not yet supported. Is: {}.", needsValidation);
- LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", filenameSuffix);
- LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", shardNameTemplate);
+ LOG.warn(
+ "Translation of TextIO.Write.needsValidation not yet supported. Is: {}.",
+ needsValidation);
+ LOG.warn(
+ "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.",
+ filenameSuffix);
+ LOG.warn(
+ "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.",
+ shardNameTemplate);
- //inputDataSet.print();
- DataSink<T> dataSink = inputDataSet.writeAsText(filenamePrefix);
+ MapOperator<WindowedValue<T>, T> valueStream = inputDataSet.map(
+ new MapFunction<WindowedValue<T>, T>() {
+ @Override
+ public T map(WindowedValue<T> value) throws Exception {
+ return value.getValue();
+ }
+ }).returns(new CoderTypeInformation<>(transform.getCoder()));
+
+ DataSink<T> dataSink = valueStream.writeAsText(filenamePrefix);
if (numShards > 0) {
dataSink.setParallelism(numShards);
@@ -285,148 +267,414 @@ public class FlinkBatchTransformTranslators {
}
}
- private static class ConsoleIOWriteTranslatorBatch implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ConsoleIO.Write.Bound> {
+ private static class WindowBoundTranslatorBatch<T>
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Window.Bound<T>> {
+
@Override
- public void translateNode(ConsoleIO.Write.Bound transform, FlinkBatchTranslationContext context) {
+ public void translateNode(Window.Bound<T> transform, FlinkBatchTranslationContext context) {
PValue input = context.getInput(transform);
- DataSet<?> inputDataSet = context.getInputDataSet(input);
- inputDataSet.printOnTaskManager(transform.getName());
+
+ TypeInformation<WindowedValue<T>> resultTypeInfo =
+ context.getTypeInfo(context.getOutput(transform));
+
+ DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(input);
+
+ @SuppressWarnings("unchecked")
+ final WindowingStrategy<T, ? extends BoundedWindow> windowingStrategy =
+ (WindowingStrategy<T, ? extends BoundedWindow>)
+ context.getOutput(transform).getWindowingStrategy();
+
+ WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+
+ FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction =
+ new FlinkAssignWindows<>(windowFn);
+
+ DataSet<WindowedValue<T>> resultDataSet = inputDataSet
+ .flatMap(assignWindowsFunction)
+ .name(context.getOutput(transform).getName())
+ .returns(resultTypeInfo);
+
+ context.setOutputDataSet(context.getOutput(transform), resultDataSet);
}
}
- private static class WriteSinkTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Write.Bound<T>> {
+ private static class GroupByKeyTranslatorBatch<K, InputT>
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, InputT>> {
@Override
- public void translateNode(Write.Bound<T> transform, FlinkBatchTranslationContext context) {
- String name = transform.getName();
- PValue input = context.getInput(transform);
- DataSet<T> inputDataSet = context.getInputDataSet(input);
+ public void translateNode(
+ GroupByKey<K, InputT> transform,
+ FlinkBatchTranslationContext context) {
+
+ // for now, this is copied from the Combine.PerKey translater. Once we have the new runner API
+ // we can replace GroupByKey by a Combine.PerKey with the Concatenate CombineFn
+
+ DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+ context.getInputDataSet(context.getInput(transform));
+
+ Combine.KeyedCombineFn<K, InputT, List<InputT>, List<InputT>> combineFn =
+ new Concatenate<InputT>().asKeyedFn();
+
+ KvCoder<K, InputT> inputCoder = (KvCoder<K, InputT>) context.getInput(transform).getCoder();
+
+ Coder<List<InputT>> accumulatorCoder;
+
+ try {
+ accumulatorCoder =
+ combineFn.getAccumulatorCoder(
+ context.getInput(transform).getPipeline().getCoderRegistry(),
+ inputCoder.getKeyCoder(),
+ inputCoder.getValueCoder());
+ } catch (CannotProvideCoderException e) {
+ throw new RuntimeException(e);
+ }
+
+ WindowingStrategy<?, ?> windowingStrategy =
+ context.getInput(transform).getWindowingStrategy();
+
+ TypeInformation<WindowedValue<KV<K, InputT>>> kvCoderTypeInformation =
+ new KvCoderTypeInformation<>(
+ WindowedValue.getFullCoder(
+ inputCoder,
+ windowingStrategy.getWindowFn().windowCoder()));
+
+ TypeInformation<WindowedValue<KV<K, List<InputT>>>> partialReduceTypeInfo =
+ new KvCoderTypeInformation<>(
+ WindowedValue.getFullCoder(
+ KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
+ windowingStrategy.getWindowFn().windowCoder()));
+
+ Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
+ new UnsortedGrouping<>(
+ inputDataSet,
+ new Keys.ExpressionKeys<>(new String[]{"key"},
+ kvCoderTypeInformation));
+
+ FlinkPartialReduceFunction<K, InputT, List<InputT>, ?> partialReduceFunction;
+ FlinkReduceFunction<K, List<InputT>, List<InputT>, ?> reduceFunction;
+
+ if (windowingStrategy.getWindowFn().isNonMerging()) {
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<?, BoundedWindow> boundedStrategy =
+ (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
+
+ partialReduceFunction = new FlinkPartialReduceFunction<>(
+ combineFn,
+ boundedStrategy,
+ Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
+ context.getPipelineOptions());
+
+ reduceFunction = new FlinkReduceFunction<>(
+ combineFn,
+ boundedStrategy,
+ Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
+ context.getPipelineOptions());
+
+ } else {
+ if (!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
+ throw new UnsupportedOperationException(
+ "Merging WindowFn with windows other than IntervalWindow are not supported.");
+ }
+
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<?, IntervalWindow> intervalStrategy =
+ (WindowingStrategy<?, IntervalWindow>) windowingStrategy;
+
+ partialReduceFunction = new FlinkMergingPartialReduceFunction<>(
+ combineFn,
+ intervalStrategy,
+ Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
+ context.getPipelineOptions());
+
+ reduceFunction = new FlinkMergingReduceFunction<>(
+ combineFn,
+ intervalStrategy,
+ Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
+ context.getPipelineOptions());
+ }
+
+ // Partially GroupReduce the values into the intermediate format AccumT (combine)
+ GroupCombineOperator<
+ WindowedValue<KV<K, InputT>>,
+ WindowedValue<KV<K, List<InputT>>>> groupCombine =
+ new GroupCombineOperator<>(
+ inputGrouping,
+ partialReduceTypeInfo,
+ partialReduceFunction,
+ "GroupCombine: " + transform.getName());
+
+ Grouping<WindowedValue<KV<K, List<InputT>>>> intermediateGrouping =
+ new UnsortedGrouping<>(
+ groupCombine, new Keys.ExpressionKeys<>(new String[]{"key"}, groupCombine.getType()));
+
+ // Fully reduce the values and create output format VO
+ GroupReduceOperator<
+ WindowedValue<KV<K, List<InputT>>>, WindowedValue<KV<K, List<InputT>>>> outputDataSet =
+ new GroupReduceOperator<>(
+ intermediateGrouping, partialReduceTypeInfo, reduceFunction, transform.getName());
+
+ context.setOutputDataSet(context.getOutput(transform), outputDataSet);
- inputDataSet.output(new SinkOutputFormat<>(transform, context.getPipelineOptions())).name(name);
}
}
/**
- * Translates a GroupByKey while ignoring window assignments. Current ignores windows.
+ * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
+ *
+ * <p>For internal use to translate {@link GroupByKey}. For a large {@link PCollection} this
+ * is expected to crash!
+ *
+ * <p>This is copied from the dataflow runner code.
+ *
+ * @param <T> the type of elements to concatenate.
*/
- private static class GroupByKeyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, V>> {
+ private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
+ @Override
+ public List<T> createAccumulator() {
+ return new ArrayList<T>();
+ }
@Override
- public void translateNode(GroupByKey<K, V> transform, FlinkBatchTranslationContext context) {
- DataSet<KV<K, V>> inputDataSet = context.getInputDataSet(context.getInput(transform));
- GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> groupReduceFunction = new FlinkKeyedListAggregationFunction<>();
+ public List<T> addInput(List<T> accumulator, T input) {
+ accumulator.add(input);
+ return accumulator;
+ }
- TypeInformation<KV<K, Iterable<V>>> typeInformation = context.getTypeInfo(context.getOutput(transform));
+ @Override
+ public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+ List<T> result = createAccumulator();
+ for (List<T> accumulator : accumulators) {
+ result.addAll(accumulator);
+ }
+ return result;
+ }
- Grouping<KV<K, V>> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet.getType()));
+ @Override
+ public List<T> extractOutput(List<T> accumulator) {
+ return accumulator;
+ }
- GroupReduceOperator<KV<K, V>, KV<K, Iterable<V>>> outputDataSet =
- new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName());
+ @Override
+ public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
- context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+ @Override
+ public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
}
}
- private static class CombinePerKeyTranslatorBatch<K, VI, VA, VO> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Combine.PerKey<K, VI, VO>> {
+
+ private static class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+ Combine.PerKey<K, InputT, OutputT>> {
@Override
- public void translateNode(Combine.PerKey<K, VI, VO> transform, FlinkBatchTranslationContext context) {
- DataSet<KV<K, VI>> inputDataSet = context.getInputDataSet(context.getInput(transform));
+ @SuppressWarnings("unchecked")
+ public void translateNode(
+ Combine.PerKey<K, InputT, OutputT> transform,
+ FlinkBatchTranslationContext context) {
+ DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+ context.getInputDataSet(context.getInput(transform));
- @SuppressWarnings("unchecked")
- Combine.KeyedCombineFn<K, VI, VA, VO> keyedCombineFn = (Combine.KeyedCombineFn<K, VI, VA, VO>) transform.getFn();
+ CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn =
+ (CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT>) transform.getFn();
+
+ KvCoder<K, InputT> inputCoder = (KvCoder<K, InputT>) context.getInput(transform).getCoder();
- KvCoder<K, VI> inputCoder = (KvCoder<K, VI>) context.getInput(transform).getCoder();
+ Coder<AccumT> accumulatorCoder;
- Coder<VA> accumulatorCoder =
- null;
try {
- accumulatorCoder = keyedCombineFn.getAccumulatorCoder(context.getInput(transform).getPipeline().getCoderRegistry(), inputCoder.getKeyCoder(), inputCoder.getValueCoder());
+ accumulatorCoder =
+ combineFn.getAccumulatorCoder(
+ context.getInput(transform).getPipeline().getCoderRegistry(),
+ inputCoder.getKeyCoder(),
+ inputCoder.getValueCoder());
} catch (CannotProvideCoderException e) {
- e.printStackTrace();
- // TODO
+ throw new RuntimeException(e);
}
- TypeInformation<KV<K, VI>> kvCoderTypeInformation = new KvCoderTypeInformation<>(inputCoder);
- TypeInformation<KV<K, VA>> partialReduceTypeInfo = new KvCoderTypeInformation<>(KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder));
+ WindowingStrategy<?, ?> windowingStrategy =
+ context.getInput(transform).getWindowingStrategy();
+
+ TypeInformation<WindowedValue<KV<K, InputT>>> kvCoderTypeInformation =
+ new KvCoderTypeInformation<>(
+ WindowedValue.getFullCoder(
+ inputCoder,
+ windowingStrategy.getWindowFn().windowCoder()));
+
+ TypeInformation<WindowedValue<KV<K, AccumT>>> partialReduceTypeInfo =
+ new KvCoderTypeInformation<>(
+ WindowedValue.getFullCoder(
+ KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
+ windowingStrategy.getWindowFn().windowCoder()));
+
+ Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
+ new UnsortedGrouping<>(
+ inputDataSet,
+ new Keys.ExpressionKeys<>(new String[]{"key"},
+ kvCoderTypeInformation));
+
+ // construct a map from side input to WindowingStrategy so that
+ // the DoFn runner can map main-input windows to side input windows
+ Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
+ for (PCollectionView<?> sideInput: transform.getSideInputs()) {
+ sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
+ }
- Grouping<KV<K, VI>> inputGrouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, kvCoderTypeInformation));
+ if (windowingStrategy.getWindowFn().isNonMerging()) {
+ WindowingStrategy<?, BoundedWindow> boundedStrategy =
+ (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
+
+ FlinkPartialReduceFunction<K, InputT, AccumT, ?> partialReduceFunction =
+ new FlinkPartialReduceFunction<>(
+ combineFn,
+ boundedStrategy,
+ sideInputStrategies,
+ context.getPipelineOptions());
+
+ FlinkReduceFunction<K, AccumT, OutputT, ?> reduceFunction =
+ new FlinkReduceFunction<>(
+ combineFn,
+ boundedStrategy,
+ sideInputStrategies,
+ context.getPipelineOptions());
+
+ // Partially GroupReduce the values into the intermediate format AccumT (combine)
+ GroupCombineOperator<
+ WindowedValue<KV<K, InputT>>,
+ WindowedValue<KV<K, AccumT>>> groupCombine =
+ new GroupCombineOperator<>(
+ inputGrouping,
+ partialReduceTypeInfo,
+ partialReduceFunction,
+ "GroupCombine: " + transform.getName());
+
+ transformSideInputs(transform.getSideInputs(), groupCombine, context);
+
+ TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo =
+ context.getTypeInfo(context.getOutput(transform));
+
+ Grouping<WindowedValue<KV<K, AccumT>>> intermediateGrouping =
+ new UnsortedGrouping<>(
+ groupCombine,
+ new Keys.ExpressionKeys<>(new String[]{"key"}, groupCombine.getType()));
+
+ // Fully reduce the values and create output format OutputT
+ GroupReduceOperator<
+ WindowedValue<KV<K, AccumT>>, WindowedValue<KV<K, OutputT>>> outputDataSet =
+ new GroupReduceOperator<>(
+ intermediateGrouping, reduceTypeInfo, reduceFunction, transform.getName());
+
+ transformSideInputs(transform.getSideInputs(), outputDataSet, context);
+
+ context.setOutputDataSet(context.getOutput(transform), outputDataSet);
- FlinkPartialReduceFunction<K, VI, VA> partialReduceFunction = new FlinkPartialReduceFunction<>(keyedCombineFn);
+ } else {
+ if (!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
+ throw new UnsupportedOperationException(
+ "Merging WindowFn with windows other than IntervalWindow are not supported.");
+ }
- // Partially GroupReduce the values into the intermediate format VA (combine)
- GroupCombineOperator<KV<K, VI>, KV<K, VA>> groupCombine =
- new GroupCombineOperator<>(inputGrouping, partialReduceTypeInfo, partialReduceFunction,
- "GroupCombine: " + transform.getName());
+ // for merging windows we can't to a pre-shuffle combine step since
+ // elements would not be in their correct windows for side-input access
- // Reduce fully to VO
- GroupReduceFunction<KV<K, VA>, KV<K, VO>> reduceFunction = new FlinkReduceFunction<>(keyedCombineFn);
+ WindowingStrategy<?, IntervalWindow> intervalStrategy =
+ (WindowingStrategy<?, IntervalWindow>) windowingStrategy;
- TypeInformation<KV<K, VO>> reduceTypeInfo = context.getTypeInfo(context.getOutput(transform));
+ FlinkMergingNonShuffleReduceFunction<K, InputT, AccumT, OutputT, ?> reduceFunction =
+ new FlinkMergingNonShuffleReduceFunction<>(
+ combineFn,
+ intervalStrategy,
+ sideInputStrategies,
+ context.getPipelineOptions());
- Grouping<KV<K, VA>> intermediateGrouping = new UnsortedGrouping<>(groupCombine, new Keys.ExpressionKeys<>(new String[]{"key"}, groupCombine.getType()));
+ TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo =
+ context.getTypeInfo(context.getOutput(transform));
+
+ Grouping<WindowedValue<KV<K, InputT>>> grouping =
+ new UnsortedGrouping<>(
+ inputDataSet,
+ new Keys.ExpressionKeys<>(new String[]{"key"}, kvCoderTypeInformation));
+
+ // Fully reduce the values and create output format OutputT
+ GroupReduceOperator<
+ WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> outputDataSet =
+ new GroupReduceOperator<>(
+ grouping, reduceTypeInfo, reduceFunction, transform.getName());
+
+ transformSideInputs(transform.getSideInputs(), outputDataSet, context);
+
+ context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+ }
- // Fully reduce the values and create output format VO
- GroupReduceOperator<KV<K, VA>, KV<K, VO>> outputDataSet =
- new GroupReduceOperator<>(intermediateGrouping, reduceTypeInfo, reduceFunction, transform.getName());
- context.setOutputDataSet(context.getOutput(transform), outputDataSet);
}
}
-// private static class CombineGroupedValuesTranslator<K, VI, VO> implements FlinkPipelineTranslator.TransformTranslator<Combine.GroupedValues<K, VI, VO>> {
-//
-// @Override
-// public void translateNode(Combine.GroupedValues<K, VI, VO> transform, TranslationContext context) {
-// DataSet<KV<K, VI>> inputDataSet = context.getInputDataSet(transform.getInput());
-//
-// Combine.KeyedCombineFn<? super K, ? super VI, ?, VO> keyedCombineFn = transform.getFn();
-//
-// GroupReduceFunction<KV<K, VI>, KV<K, VO>> groupReduceFunction = new FlinkCombineFunction<>(keyedCombineFn);
-//
-// TypeInformation<KV<K, VO>> typeInformation = context.getTypeInfo(transform.getOutput());
-//
-// Grouping<KV<K, VI>> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{""}, inputDataSet.getType()));
-//
-// GroupReduceOperator<KV<K, VI>, KV<K, VO>> outputDataSet =
-// new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName());
-// context.setOutputDataSet(transform.getOutput(), outputDataSet);
-// }
-// }
-
- private static class ParDoBoundTranslatorBatch<IN, OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.Bound<IN, OUT>> {
- private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslatorBatch.class);
+ private static class ParDoBoundTranslatorBatch<InputT, OutputT>
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+ ParDo.Bound<InputT, OutputT>> {
@Override
- public void translateNode(ParDo.Bound<IN, OUT> transform, FlinkBatchTranslationContext context) {
- DataSet<IN> inputDataSet = context.getInputDataSet(context.getInput(transform));
+ public void translateNode(
+ ParDo.Bound<InputT, OutputT> transform,
+ FlinkBatchTranslationContext context) {
+ DataSet<WindowedValue<InputT>> inputDataSet =
+ context.getInputDataSet(context.getInput(transform));
- final DoFn<IN, OUT> doFn = transform.getFn();
+ final DoFn<InputT, OutputT> doFn = transform.getFn();
- TypeInformation<OUT> typeInformation = context.getTypeInfo(context.getOutput(transform));
+ TypeInformation<WindowedValue<OutputT>> typeInformation =
+ context.getTypeInfo(context.getOutput(transform));
- FlinkDoFnFunction<IN, OUT> doFnWrapper = new FlinkDoFnFunction<>(doFn, context.getPipelineOptions());
- MapPartitionOperator<IN, OUT> outputDataSet = new MapPartitionOperator<>(inputDataSet, typeInformation, doFnWrapper, transform.getName());
+ List<PCollectionView<?>> sideInputs = transform.getSideInputs();
- transformSideInputs(transform.getSideInputs(), outputDataSet, context);
+ // construct a map from side input to WindowingStrategy so that
+ // the DoFn runner can map main-input windows to side input windows
+ Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
+ for (PCollectionView<?> sideInput: sideInputs) {
+ sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
+ }
+
+ FlinkDoFnFunction<InputT, OutputT> doFnWrapper =
+ new FlinkDoFnFunction<>(
+ doFn,
+ context.getOutput(transform).getWindowingStrategy(),
+ sideInputStrategies,
+ context.getPipelineOptions());
+
+ MapPartitionOperator<WindowedValue<InputT>, WindowedValue<OutputT>> outputDataSet =
+ new MapPartitionOperator<>(
+ inputDataSet,
+ typeInformation,
+ doFnWrapper,
+ transform.getName());
+
+ transformSideInputs(sideInputs, outputDataSet, context);
context.setOutputDataSet(context.getOutput(transform), outputDataSet);
}
}
- private static class ParDoBoundMultiTranslatorBatch<IN, OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.BoundMulti<IN, OUT>> {
- private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundMultiTranslatorBatch.class);
+ private static class ParDoBoundMultiTranslatorBatch<InputT, OutputT>
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+ ParDo.BoundMulti<InputT, OutputT>> {
@Override
- public void translateNode(ParDo.BoundMulti<IN, OUT> transform, FlinkBatchTranslationContext context) {
- DataSet<IN> inputDataSet = context.getInputDataSet(context.getInput(transform));
+ public void translateNode(
+ ParDo.BoundMulti<InputT, OutputT> transform,
+ FlinkBatchTranslationContext context) {
+ DataSet<WindowedValue<InputT>> inputDataSet =
+ context.getInputDataSet(context.getInput(transform));
- final DoFn<IN, OUT> doFn = transform.getFn();
+ final DoFn<InputT, OutputT> doFn = transform.getFn();
Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
- // put the main output at index 0, FlinkMultiOutputDoFnFunction also expects this
+ // put the main output at index 0, FlinkMultiOutputDoFnFunction expects this
outputMap.put(transform.getMainOutputTag(), 0);
int count = 1;
for (TupleTag<?> tag: outputs.keySet()) {
@@ -435,58 +683,118 @@ public class FlinkBatchTransformTranslators {
}
}
+ // assume that the windowing strategy is the same for all outputs
+ WindowingStrategy<?, ?> windowingStrategy = null;
+
// collect all output Coders and create a UnionCoder for our tagged outputs
List<Coder<?>> outputCoders = Lists.newArrayList();
for (PCollection<?> coll: outputs.values()) {
outputCoders.add(coll.getCoder());
+ windowingStrategy = coll.getWindowingStrategy();
+ }
+
+ if (windowingStrategy == null) {
+ throw new IllegalStateException("No outputs defined.");
}
UnionCoder unionCoder = UnionCoder.of(outputCoders);
- @SuppressWarnings("unchecked")
- TypeInformation<RawUnionValue> typeInformation = new CoderTypeInformation<>(unionCoder);
+ TypeInformation<WindowedValue<RawUnionValue>> typeInformation =
+ new CoderTypeInformation<>(
+ WindowedValue.getFullCoder(
+ unionCoder,
+ windowingStrategy.getWindowFn().windowCoder()));
- @SuppressWarnings("unchecked")
- FlinkMultiOutputDoFnFunction<IN, OUT> doFnWrapper = new FlinkMultiOutputDoFnFunction(doFn, context.getPipelineOptions(), outputMap);
- MapPartitionOperator<IN, RawUnionValue> outputDataSet = new MapPartitionOperator<>(inputDataSet, typeInformation, doFnWrapper, transform.getName());
+ List<PCollectionView<?>> sideInputs = transform.getSideInputs();
- transformSideInputs(transform.getSideInputs(), outputDataSet, context);
+ // construct a map from side input to WindowingStrategy so that
+ // the DoFn runner can map main-input windows to side input windows
+ Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
+ for (PCollectionView<?> sideInput: sideInputs) {
+ sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
+ }
- for (Map.Entry<TupleTag<?>, PCollection<?>> output: outputs.entrySet()) {
- TypeInformation<Object> outputType = context.getTypeInfo(output.getValue());
- int outputTag = outputMap.get(output.getKey());
- FlinkMultiOutputPruningFunction<Object> pruningFunction = new FlinkMultiOutputPruningFunction<>(outputTag);
- FlatMapOperator<RawUnionValue, Object> pruningOperator = new
- FlatMapOperator<>(outputDataSet, outputType,
- pruningFunction, output.getValue().getName());
- context.setOutputDataSet(output.getValue(), pruningOperator);
+ @SuppressWarnings("unchecked")
+ FlinkMultiOutputDoFnFunction<InputT, OutputT> doFnWrapper =
+ new FlinkMultiOutputDoFnFunction(
+ doFn,
+ windowingStrategy,
+ sideInputStrategies,
+ context.getPipelineOptions(),
+ outputMap);
+
+ MapPartitionOperator<WindowedValue<InputT>, WindowedValue<RawUnionValue>> taggedDataSet =
+ new MapPartitionOperator<>(
+ inputDataSet,
+ typeInformation,
+ doFnWrapper,
+ transform.getName());
+
+ transformSideInputs(sideInputs, taggedDataSet, context);
+ for (Map.Entry<TupleTag<?>, PCollection<?>> output: outputs.entrySet()) {
+ pruneOutput(
+ taggedDataSet,
+ context,
+ outputMap.get(output.getKey()),
+ (PCollection) output.getValue());
}
}
+
+ private <T> void pruneOutput(
+ MapPartitionOperator<WindowedValue<InputT>, WindowedValue<RawUnionValue>> taggedDataSet,
+ FlinkBatchTranslationContext context,
+ int integerTag,
+ PCollection<T> collection) {
+ TypeInformation<WindowedValue<T>> outputType = context.getTypeInfo(collection);
+
+ FlinkMultiOutputPruningFunction<T> pruningFunction =
+ new FlinkMultiOutputPruningFunction<>(integerTag);
+
+ FlatMapOperator<WindowedValue<RawUnionValue>, WindowedValue<T>> pruningOperator =
+ new FlatMapOperator<>(
+ taggedDataSet,
+ outputType,
+ pruningFunction,
+ collection.getName());
+
+ context.setOutputDataSet(collection, pruningOperator);
+ }
}
- private static class FlattenPCollectionTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Flatten.FlattenPCollectionList<T>> {
+ private static class FlattenPCollectionTranslatorBatch<T>
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+ Flatten.FlattenPCollectionList<T>> {
@Override
@SuppressWarnings("unchecked")
- public void translateNode(Flatten.FlattenPCollectionList<T> transform, FlinkBatchTranslationContext context) {
+ public void translateNode(
+ Flatten.FlattenPCollectionList<T> transform,
+ FlinkBatchTranslationContext context) {
+
List<PCollection<T>> allInputs = context.getInput(transform).getAll();
- DataSet<T> result = null;
+ DataSet<WindowedValue<T>> result = null;
+
if (allInputs.isEmpty()) {
+
// create an empty dummy source to satisfy downstream operations
// we cannot create an empty source in Flink, therefore we have to
// add the flatMap that simply never forwards the single element
DataSource<String> dummySource =
context.getExecutionEnvironment().fromElements("dummy");
- result = dummySource.flatMap(new FlatMapFunction<String, T>() {
+ result = dummySource.flatMap(new FlatMapFunction<String, WindowedValue<T>>() {
@Override
- public void flatMap(String s, Collector<T> collector) throws Exception {
+ public void flatMap(String s, Collector<WindowedValue<T>> collector) throws Exception {
// never return anything
}
- }).returns(new CoderTypeInformation<>((Coder<T>) VoidCoder.of()));
+ }).returns(
+ new CoderTypeInformation<>(
+ WindowedValue.getFullCoder(
+ (Coder<T>) VoidCoder.of(),
+ GlobalWindow.Coder.INSTANCE)));
} else {
for (PCollection<T> collection : allInputs) {
- DataSet<T> current = context.getInputDataSet(collection);
+ DataSet<WindowedValue<T>> current = context.getInputDataSet(collection);
if (result == null) {
result = current;
} else {
@@ -494,103 +802,47 @@ public class FlinkBatchTransformTranslators {
}
}
}
- context.setOutputDataSet(context.getOutput(transform), result);
- }
- }
- private static class CreatePCollectionViewTranslatorBatch<R, T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<View.CreatePCollectionView<R, T>> {
- @Override
- public void translateNode(View.CreatePCollectionView<R, T> transform, FlinkBatchTranslationContext context) {
- DataSet<T> inputDataSet = context.getInputDataSet(context.getInput(transform));
- PCollectionView<T> input = transform.apply(null);
- context.setSideInputDataSet(input, inputDataSet);
+ // insert a dummy filter, there seems to be a bug in Flink
+ // that produces duplicate elements after the union in some cases
+ // if we don't
+ result = result.filter(new FilterFunction<WindowedValue<T>>() {
+ @Override
+ public boolean filter(WindowedValue<T> tWindowedValue) throws Exception {
+ return true;
+ }
+ }).name("UnionFixFilter");
+ context.setOutputDataSet(context.getOutput(transform), result);
}
}
- private static class CreateTranslatorBatch<OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Create.Values<OUT>> {
+ private static class CreatePCollectionViewTranslatorBatch<ElemT, ViewT>
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+ View.CreatePCollectionView<ElemT, ViewT>> {
@Override
- public void translateNode(Create.Values<OUT> transform, FlinkBatchTranslationContext context) {
- TypeInformation<OUT> typeInformation = context.getOutputTypeInfo();
- Iterable<OUT> elements = transform.getElements();
-
- // we need to serialize the elements to byte arrays, since they might contain
- // elements that are not serializable by Java serialization. We deserialize them
- // in the FlatMap function using the Coder.
-
- List<byte[]> serializedElements = Lists.newArrayList();
- Coder<OUT> coder = context.getOutput(transform).getCoder();
- for (OUT element: elements) {
- ByteArrayOutputStream bao = new ByteArrayOutputStream();
- try {
- coder.encode(element, bao, Coder.Context.OUTER);
- serializedElements.add(bao.toByteArray());
- } catch (IOException e) {
- throw new RuntimeException("Could not serialize Create elements using Coder: " + e);
- }
- }
+ public void translateNode(
+ View.CreatePCollectionView<ElemT, ViewT> transform,
+ FlinkBatchTranslationContext context) {
+ DataSet<WindowedValue<ElemT>> inputDataSet =
+ context.getInputDataSet(context.getInput(transform));
- DataSet<Integer> initDataSet = context.getExecutionEnvironment().fromElements(1);
- FlinkCreateFunction<Integer, OUT> flatMapFunction = new FlinkCreateFunction<>(serializedElements, coder);
- FlatMapOperator<Integer, OUT> outputDataSet = new FlatMapOperator<>(initDataSet, typeInformation, flatMapFunction, transform.getName());
+ PCollectionView<ViewT> input = transform.getView();
- context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+ context.setSideInputDataSet(input, inputDataSet);
}
}
- private static void transformSideInputs(List<PCollectionView<?>> sideInputs,
- MapPartitionOperator<?, ?> outputDataSet,
- FlinkBatchTranslationContext context) {
+ private static void transformSideInputs(
+ List<PCollectionView<?>> sideInputs,
+ SingleInputUdfOperator<?, ?, ?> outputDataSet,
+ FlinkBatchTranslationContext context) {
// get corresponding Flink broadcast DataSets
- for(PCollectionView<?> input : sideInputs) {
+ for (PCollectionView<?> input : sideInputs) {
DataSet<?> broadcastSet = context.getSideInputDataSet(input);
outputDataSet.withBroadcastSet(broadcastSet, input.getTagInternal().getId());
}
}
-// Disabled because it depends on a pending pull request to the DataFlowSDK
- /**
- * Special composite transform translator. Only called if the CoGroup is two dimensional.
- * @param <K>
- */
- private static class CoGroupByKeyTranslatorBatch<K, V1, V2> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<CoGroupByKey<K>> {
-
- @Override
- public void translateNode(CoGroupByKey<K> transform, FlinkBatchTranslationContext context) {
- KeyedPCollectionTuple<K> input = context.getInput(transform);
-
- CoGbkResultSchema schema = input.getCoGbkResultSchema();
- List<KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?>> keyedCollections = input.getKeyedCollections();
-
- KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?> taggedCollection1 = keyedCollections.get(0);
- KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?> taggedCollection2 = keyedCollections.get(1);
-
- TupleTag<?> tupleTag1 = taggedCollection1.getTupleTag();
- TupleTag<?> tupleTag2 = taggedCollection2.getTupleTag();
-
- PCollection<? extends KV<K, ?>> collection1 = taggedCollection1.getCollection();
- PCollection<? extends KV<K, ?>> collection2 = taggedCollection2.getCollection();
-
- DataSet<KV<K,V1>> inputDataSet1 = context.getInputDataSet(collection1);
- DataSet<KV<K,V2>> inputDataSet2 = context.getInputDataSet(collection2);
-
- TypeInformation<KV<K,CoGbkResult>> typeInfo = context.getOutputTypeInfo();
-
- FlinkCoGroupKeyedListAggregator<K,V1,V2> aggregator = new FlinkCoGroupKeyedListAggregator<>(schema, tupleTag1, tupleTag2);
-
- Keys.ExpressionKeys<KV<K,V1>> keySelector1 = new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet1.getType());
- Keys.ExpressionKeys<KV<K,V2>> keySelector2 = new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet2.getType());
-
- DataSet<KV<K, CoGbkResult>> out = new CoGroupOperator<>(inputDataSet1, inputDataSet2,
- keySelector1, keySelector2,
- aggregator, typeInfo, null, transform.getName());
- context.setOutputDataSet(context.getOutput(transform), out);
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Miscellaneous
- // --------------------------------------------------------------------------------------------
-
private FlinkBatchTransformTranslators() {}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
index 501b1ea..ecc3a65 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
@@ -18,26 +18,28 @@
package org.apache.beam.runners.flink.translation;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.types.KvCoderTypeInformation;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TypedPValue;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import java.util.HashMap;
import java.util.Map;
+/**
+ * Helper for {@link FlinkBatchPipelineTranslator} and translators in
+ * {@link FlinkBatchTransformTranslators}.
+ */
public class FlinkBatchTranslationContext {
private final Map<PValue, DataSet<?>> dataSets;
@@ -81,13 +83,13 @@ public class FlinkBatchTranslationContext {
}
@SuppressWarnings("unchecked")
- public <T> DataSet<T> getInputDataSet(PValue value) {
+ public <T> DataSet<WindowedValue<T>> getInputDataSet(PValue value) {
// assume that the DataSet is used as an input if retrieved here
danglingDataSets.remove(value);
- return (DataSet<T>) dataSets.get(value);
+ return (DataSet<WindowedValue<T>>) dataSets.get(value);
}
- public void setOutputDataSet(PValue value, DataSet<?> set) {
+ public <T> void setOutputDataSet(PValue value, DataSet<WindowedValue<T>> set) {
if (!dataSets.containsKey(value)) {
dataSets.put(value, set);
danglingDataSets.put(value, set);
@@ -107,40 +109,32 @@ public class FlinkBatchTranslationContext {
return (DataSet<T>) broadcastDataSets.get(value);
}
- public void setSideInputDataSet(PCollectionView<?> value, DataSet<?> set) {
+ public <ViewT, ElemT> void setSideInputDataSet(
+ PCollectionView<ViewT> value,
+ DataSet<WindowedValue<ElemT>> set) {
if (!broadcastDataSets.containsKey(value)) {
broadcastDataSets.put(value, set);
}
}
-
- @SuppressWarnings("unchecked")
- public <T> TypeInformation<T> getTypeInfo(PInput output) {
- if (output instanceof TypedPValue) {
- Coder<?> outputCoder = ((TypedPValue) output).getCoder();
- if (outputCoder instanceof KvCoder) {
- return new KvCoderTypeInformation((KvCoder) outputCoder);
- } else {
- return new CoderTypeInformation(outputCoder);
- }
- }
- return new GenericTypeInfo<>((Class<T>)Object.class);
- }
-
- public <T> TypeInformation<T> getInputTypeInfo() {
- return getTypeInfo(currentTransform.getInput());
- }
- public <T> TypeInformation<T> getOutputTypeInfo() {
- return getTypeInfo((PValue) currentTransform.getOutput());
+ @SuppressWarnings("unchecked")
+ public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> collection) {
+ Coder<T> valueCoder = collection.getCoder();
+ WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
+ WindowedValue.getFullCoder(
+ valueCoder,
+ collection.getWindowingStrategy().getWindowFn().windowCoder());
+
+ return new CoderTypeInformation<>(windowedValueCoder);
}
@SuppressWarnings("unchecked")
- <I extends PInput> I getInput(PTransform<I, ?> transform) {
- return (I) currentTransform.getInput();
+ <T extends PInput> T getInput(PTransform<T, ?> transform) {
+ return (T) currentTransform.getInput();
}
@SuppressWarnings("unchecked")
- <O extends POutput> O getOutput(PTransform<?, O> transform) {
- return (O) currentTransform.getOutput();
+ <T extends POutput> T getOutput(PTransform<?, T> transform) {
+ return (T) currentTransform.getOutput();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 2778d5c..b3fed99 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -18,7 +18,6 @@
package org.apache.beam.runners.flink.translation;
-import org.apache.beam.runners.flink.translation.functions.UnionCoder;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.types.FlinkCoder;
import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
@@ -46,6 +45,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -229,29 +229,15 @@ public class FlinkStreamingTransformTranslators {
BoundedSource<T> boundedSource = transform.getSource();
PCollection<T> output = context.getOutput(transform);
- Coder<T> defaultOutputCoder = boundedSource.getDefaultOutputCoder();
- CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(defaultOutputCoder);
+ TypeInformation<WindowedValue<T>> typeInfo = context.getTypeInfo(output);
- DataStream<T> source = context.getExecutionEnvironment().createInput(
+ DataStream<WindowedValue<T>> source = context.getExecutionEnvironment().createInput(
new SourceInputFormat<>(
boundedSource,
context.getPipelineOptions()),
typeInfo);
- DataStream<WindowedValue<T>> windowedStream = source.flatMap(
- new FlatMapFunction<T, WindowedValue<T>>() {
- @Override
- public void flatMap(T value, Collector<WindowedValue<T>> out) throws Exception {
- out.collect(
- WindowedValue.of(value,
- Instant.now(),
- GlobalWindow.INSTANCE,
- PaneInfo.NO_FIRING));
- }
- })
- .assignTimestampsAndWatermarks(new IngestionTimeExtractor<WindowedValue<T>>());
-
- context.setOutputDataStream(output, windowedStream);
+ context.setOutputDataStream(output, source);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
index 8bc7317..0cb80ba 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
@@ -17,21 +17,30 @@
*/
package org.apache.beam.runners.flink.translation;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.HashMap;
import java.util.Map;
+/**
+ * Helper for keeping track of which {@link DataStream DataStreams} map
+ * to which {@link PTransform PTransforms}.
+ */
public class FlinkStreamingTranslationContext {
private final StreamExecutionEnvironment env;
@@ -80,12 +89,24 @@ public class FlinkStreamingTranslationContext {
}
@SuppressWarnings("unchecked")
- public <I extends PInput> I getInput(PTransform<I, ?> transform) {
- return (I) currentTransform.getInput();
+ public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> collection) {
+ Coder<T> valueCoder = collection.getCoder();
+ WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
+ WindowedValue.getFullCoder(
+ valueCoder,
+ collection.getWindowingStrategy().getWindowFn().windowCoder());
+
+ return new CoderTypeInformation<>(windowedValueCoder);
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public <T extends PInput> T getInput(PTransform<T, ?> transform) {
+ return (T) currentTransform.getInput();
}
@SuppressWarnings("unchecked")
- public <O extends POutput> O getOutput(PTransform<?, O> transform) {
- return (O) currentTransform.getOutput();
+ public <T extends POutput> T getOutput(PTransform<?, T> transform) {
+ return (T) currentTransform.getOutput();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
new file mode 100644
index 0000000..7ea8c20
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+
+import org.joda.time.Instant;
+
+import java.util.Collection;
+
+/**
+ * {@link org.apache.beam.sdk.transforms.windowing.WindowFn.AssignContext} for
+ * Flink functions.
+ */
+class FlinkAssignContext<InputT, W extends BoundedWindow>
+ extends WindowFn<InputT, W>.AssignContext {
+ private final WindowedValue<InputT> value;
+
+ FlinkAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
+ fn.super();
+ this.value = value;
+ }
+
+ @Override
+ public InputT element() {
+ return value.getValue();
+ }
+
+ @Override
+ public Instant timestamp() {
+ return value.getTimestamp();
+ }
+
+ @Override
+ public Collection<? extends BoundedWindow> windows() {
+ return value.getWindows();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
new file mode 100644
index 0000000..e07e49a
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.Collection;
+
+/**
+ * Flink {@link FlatMapFunction} for implementing
+ * {@link org.apache.beam.sdk.transforms.windowing.Window.Bound}.
+ */
+public class FlinkAssignWindows<T, W extends BoundedWindow>
+ implements FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
+
+ private final WindowFn<T, W> windowFn;
+
+ public FlinkAssignWindows(WindowFn<T, W> windowFn) {
+ this.windowFn = windowFn;
+ }
+
+ @Override
+ public void flatMap(
+ WindowedValue<T> input, Collector<WindowedValue<T>> collector) throws Exception {
+ Collection<W> windows = windowFn.assignWindows(new FlinkAssignContext<>(windowFn, input));
+ for (W window: windows) {
+ collector.collect(
+ WindowedValue.of(input.getValue(), input.getTimestamp(), window, input.getPane()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCoGroupKeyedListAggregator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCoGroupKeyedListAggregator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCoGroupKeyedListAggregator.java
deleted file mode 100644
index 8e7cdd7..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCoGroupKeyedListAggregator.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGbkResultSchema;
-import org.apache.beam.sdk.transforms.join.RawUnionValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TupleTag;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-public class FlinkCoGroupKeyedListAggregator<K,V1,V2> implements CoGroupFunction<KV<K,V1>, KV<K,V2>, KV<K, CoGbkResult>>{
-
- private CoGbkResultSchema schema;
- private TupleTag<?> tupleTag1;
- private TupleTag<?> tupleTag2;
-
- public FlinkCoGroupKeyedListAggregator(CoGbkResultSchema schema, TupleTag<?> tupleTag1, TupleTag<?> tupleTag2) {
- this.schema = schema;
- this.tupleTag1 = tupleTag1;
- this.tupleTag2 = tupleTag2;
- }
-
- @Override
- public void coGroup(Iterable<KV<K,V1>> first, Iterable<KV<K,V2>> second, Collector<KV<K, CoGbkResult>> out) throws Exception {
- K k = null;
- List<RawUnionValue> result = new ArrayList<>();
- int index1 = schema.getIndex(tupleTag1);
- for (KV<K,?> entry : first) {
- k = entry.getKey();
- result.add(new RawUnionValue(index1, entry.getValue()));
- }
- int index2 = schema.getIndex(tupleTag2);
- for (KV<K,?> entry : second) {
- k = entry.getKey();
- result.add(new RawUnionValue(index2, entry.getValue()));
- }
- out.collect(KV.of(k, new CoGbkResult(schema, result)));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCreateFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCreateFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCreateFunction.java
deleted file mode 100644
index e5ac748..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCreateFunction.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import org.apache.beam.runners.flink.translation.types.VoidCoderTypeSerializer;
-import org.apache.beam.sdk.coders.Coder;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
-
-import java.io.ByteArrayInputStream;
-import java.util.List;
-
-/**
- * This is a hack for transforming a {@link org.apache.beam.sdk.transforms.Create}
- * operation. Flink does not allow {@code null} in it's equivalent operation:
- * {@link org.apache.flink.api.java.ExecutionEnvironment#fromElements(Object[])}. Therefore
- * we use a DataSource with one dummy element and output the elements of the Create operation
- * inside this FlatMap.
- */
-public class FlinkCreateFunction<IN, OUT> implements FlatMapFunction<IN, OUT> {
-
- private final List<byte[]> elements;
- private final Coder<OUT> coder;
-
- public FlinkCreateFunction(List<byte[]> elements, Coder<OUT> coder) {
- this.elements = elements;
- this.coder = coder;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void flatMap(IN value, Collector<OUT> out) throws Exception {
-
- for (byte[] element : elements) {
- ByteArrayInputStream bai = new ByteArrayInputStream(element);
- OUT outValue = coder.decode(bai, Coder.Context.OUTER);
- if (outValue == null) {
- // TODO Flink doesn't allow null values in records
- out.collect((OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE);
- } else {
- out.collect(outValue);
- }
- }
-
- out.close();
- }
-}