You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:15:03 UTC
[29/50] [abbrv] beam git commit: [BEAM-1994] Remove Flink examples
package
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
new file mode 100644
index 0000000..26fd0b4
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.PerKeyCombineFnRunner;
+import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.CombineFnBase;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+/**
+ * Special version of {@link FlinkReduceFunction} that supports merging windows. This
+ * assumes that the windows are {@link IntervalWindow IntervalWindows} and exhibits the
+ * same behaviour as {@code MergeOverlappingIntervalWindows}.
+ *
+ * <p>This is different from the pair of function for the non-merging windows case
+ * in that we cannot do combining before the shuffle because elements would not
+ * yet be in their correct windows for side-input access.
+ */
+public class FlinkMergingNonShuffleReduceFunction<
+ K, InputT, AccumT, OutputT, W extends IntervalWindow>
+ extends RichGroupReduceFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> {
+
+ private final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn;
+
+ private final WindowingStrategy<?, W> windowingStrategy;
+
+ private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
+
+ private final SerializedPipelineOptions serializedOptions;
+
+ public FlinkMergingNonShuffleReduceFunction(
+ CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn,
+ WindowingStrategy<?, W> windowingStrategy,
+ Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+ PipelineOptions pipelineOptions) {
+
+ this.combineFn = keyedCombineFn;
+
+ this.windowingStrategy = windowingStrategy;
+ this.sideInputs = sideInputs;
+
+ this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+
+ }
+
+ @Override
+ public void reduce(
+ Iterable<WindowedValue<KV<K, InputT>>> elements,
+ Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
+
+ PipelineOptions options = serializedOptions.getPipelineOptions();
+
+ FlinkSideInputReader sideInputReader =
+ new FlinkSideInputReader(sideInputs, getRuntimeContext());
+
+ PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner =
+ PerKeyCombineFnRunners.create(combineFn);
+
+ @SuppressWarnings("unchecked")
+ OutputTimeFn<? super BoundedWindow> outputTimeFn =
+ (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+
+ // get all elements so that we can sort them, has to fit into
+ // memory
+ // this seems very unprudent, but correct, for now
+ List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
+ for (WindowedValue<KV<K, InputT>> inputValue : elements) {
+ for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) {
+ sortedInput.add(exploded);
+ }
+ }
+ Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() {
+ @Override
+ public int compare(
+ WindowedValue<KV<K, InputT>> o1,
+ WindowedValue<KV<K, InputT>> o2) {
+ return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
+ .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
+ }
+ });
+
+ // merge windows, we have to do it in an extra pre-processing step and
+ // can't do it as we go since the window of early elements would not
+ // be correct when calling the CombineFn
+ mergeWindow(sortedInput);
+
+ // iterate over the elements that are sorted by window timestamp
+ final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator();
+
+ // create accumulator using the first elements key
+ WindowedValue<KV<K, InputT>> currentValue = iterator.next();
+ K key = currentValue.getValue().getKey();
+ IntervalWindow currentWindow =
+ (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
+ InputT firstValue = currentValue.getValue().getValue();
+ AccumT accumulator =
+ combineFnRunner.createAccumulator(key, options, sideInputReader, currentValue.getWindows());
+ accumulator = combineFnRunner.addInput(key, accumulator, firstValue,
+ options, sideInputReader, currentValue.getWindows());
+
+ // we use this to keep track of the timestamps assigned by the OutputTimeFn
+ Instant windowTimestamp =
+ outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow);
+
+ while (iterator.hasNext()) {
+ WindowedValue<KV<K, InputT>> nextValue = iterator.next();
+ IntervalWindow nextWindow =
+ (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
+
+ if (currentWindow.equals(nextWindow)) {
+ // continue accumulating and merge windows
+
+ InputT value = nextValue.getValue().getValue();
+ accumulator = combineFnRunner.addInput(key, accumulator, value,
+ options, sideInputReader, currentValue.getWindows());
+
+ windowTimestamp = outputTimeFn.combine(
+ windowTimestamp,
+ outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
+
+ } else {
+ // emit the value that we currently have
+ out.collect(
+ WindowedValue.of(
+ KV.of(key, combineFnRunner.extractOutput(key, accumulator,
+ options, sideInputReader, currentValue.getWindows())),
+ windowTimestamp,
+ currentWindow,
+ PaneInfo.NO_FIRING));
+
+ currentWindow = nextWindow;
+ currentValue = nextValue;
+ InputT value = nextValue.getValue().getValue();
+ accumulator = combineFnRunner.createAccumulator(key,
+ options, sideInputReader, currentValue.getWindows());
+ accumulator = combineFnRunner.addInput(key, accumulator, value,
+ options, sideInputReader, currentValue.getWindows());
+ windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
+ }
+
+ }
+
+ // emit the final accumulator
+ out.collect(
+ WindowedValue.of(
+ KV.of(key, combineFnRunner.extractOutput(key, accumulator,
+ options, sideInputReader, currentValue.getWindows())),
+ windowTimestamp,
+ currentWindow,
+ PaneInfo.NO_FIRING));
+ }
+
+ /**
+ * Merge windows. This assumes that the list of elements is sorted by window-end timestamp.
+ * This replaces windows in the input list.
+ */
+ private void mergeWindow(List<WindowedValue<KV<K, InputT>>> elements) {
+ int currentStart = 0;
+ IntervalWindow currentWindow =
+ (IntervalWindow) Iterables.getOnlyElement(elements.get(0).getWindows());
+
+ for (int i = 1; i < elements.size(); i++) {
+ WindowedValue<KV<K, InputT>> nextValue = elements.get(i);
+ IntervalWindow nextWindow =
+ (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
+ if (currentWindow.intersects(nextWindow)) {
+ // we continue
+ currentWindow = currentWindow.span(nextWindow);
+ } else {
+ // retrofit the merged window to all windows up to "currentStart"
+ for (int j = i - 1; j >= currentStart; j--) {
+ WindowedValue<KV<K, InputT>> value = elements.get(j);
+ elements.set(
+ j,
+ WindowedValue.of(
+ value.getValue(), value.getTimestamp(), currentWindow, value.getPane()));
+ }
+ currentStart = i;
+ currentWindow = nextWindow;
+ }
+ }
+ if (currentStart < elements.size() - 1) {
+ // we have to retrofit the last batch
+ for (int j = elements.size() - 1; j >= currentStart; j--) {
+ WindowedValue<KV<K, InputT>> value = elements.get(j);
+ elements.set(
+ j,
+ WindowedValue.of(
+ value.getValue(), value.getTimestamp(), currentWindow, value.getPane()));
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
new file mode 100644
index 0000000..c68f155
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.PerKeyCombineFnRunner;
+import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.CombineFnBase;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+/**
+ * Special version of {@link FlinkPartialReduceFunction} that supports merging windows. This
+ * assumes that the windows are {@link IntervalWindow IntervalWindows} and exhibits the
+ * same behaviour as {@code MergeOverlappingIntervalWindows}.
+ */
+public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends IntervalWindow>
+ extends FlinkPartialReduceFunction<K, InputT, AccumT, W> {
+
+ public FlinkMergingPartialReduceFunction(
+ CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn,
+ WindowingStrategy<?, W> windowingStrategy,
+ Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+ PipelineOptions pipelineOptions) {
+ super(combineFn, windowingStrategy, sideInputs, pipelineOptions);
+ }
+
+ @Override
+ public void combine(
+ Iterable<WindowedValue<KV<K, InputT>>> elements,
+ Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception {
+
+ PipelineOptions options = serializedOptions.getPipelineOptions();
+
+ FlinkSideInputReader sideInputReader =
+ new FlinkSideInputReader(sideInputs, getRuntimeContext());
+
+ PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
+ PerKeyCombineFnRunners.create(combineFn);
+
+ @SuppressWarnings("unchecked")
+ OutputTimeFn<? super BoundedWindow> outputTimeFn =
+ (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+
+ // get all elements so that we can sort them, has to fit into
+ // memory
+ // this seems very unprudent, but correct, for now
+ List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
+ for (WindowedValue<KV<K, InputT>> inputValue : elements) {
+ for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) {
+ sortedInput.add(exploded);
+ }
+ }
+ Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() {
+ @Override
+ public int compare(
+ WindowedValue<KV<K, InputT>> o1,
+ WindowedValue<KV<K, InputT>> o2) {
+ return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
+ .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
+ }
+ });
+
+ // merge windows, we have to do it in an extra pre-processing step and
+ // can't do it as we go since the window of early elements would not
+ // be correct when calling the CombineFn
+ mergeWindow(sortedInput);
+
+ // iterate over the elements that are sorted by window timestamp
+ final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator();
+
+ // create accumulator using the first elements key
+ WindowedValue<KV<K, InputT>> currentValue = iterator.next();
+ K key = currentValue.getValue().getKey();
+ IntervalWindow currentWindow =
+ (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
+ InputT firstValue = currentValue.getValue().getValue();
+ AccumT accumulator = combineFnRunner.createAccumulator(key,
+ options, sideInputReader, currentValue.getWindows());
+ accumulator = combineFnRunner.addInput(key, accumulator, firstValue,
+ options, sideInputReader, currentValue.getWindows());
+
+ // we use this to keep track of the timestamps assigned by the OutputTimeFn
+ Instant windowTimestamp =
+ outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow);
+
+ while (iterator.hasNext()) {
+ WindowedValue<KV<K, InputT>> nextValue = iterator.next();
+ IntervalWindow nextWindow = (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
+
+ if (currentWindow.equals(nextWindow)) {
+ // continue accumulating and merge windows
+
+ InputT value = nextValue.getValue().getValue();
+ accumulator = combineFnRunner.addInput(key, accumulator, value,
+ options, sideInputReader, currentValue.getWindows());
+
+ windowTimestamp = outputTimeFn.combine(
+ windowTimestamp,
+ outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
+
+ } else {
+ // emit the value that we currently have
+ out.collect(
+ WindowedValue.of(
+ KV.of(key, accumulator),
+ windowTimestamp,
+ currentWindow,
+ PaneInfo.NO_FIRING));
+
+ currentWindow = nextWindow;
+ currentValue = nextValue;
+ InputT value = nextValue.getValue().getValue();
+ accumulator = combineFnRunner.createAccumulator(key,
+ options, sideInputReader, currentValue.getWindows());
+ accumulator = combineFnRunner.addInput(key, accumulator, value,
+ options, sideInputReader, currentValue.getWindows());
+ windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
+ }
+ }
+
+ // emit the final accumulator
+ out.collect(
+ WindowedValue.of(
+ KV.of(key, accumulator),
+ windowTimestamp,
+ currentWindow,
+ PaneInfo.NO_FIRING));
+ }
+
+ /**
+ * Merge windows. This assumes that the list of elements is sorted by window-end timestamp.
+ * This replaces windows in the input list.
+ */
+ private void mergeWindow(List<WindowedValue<KV<K, InputT>>> elements) {
+ int currentStart = 0;
+ IntervalWindow currentWindow =
+ (IntervalWindow) Iterables.getOnlyElement(elements.get(0).getWindows());
+
+ for (int i = 1; i < elements.size(); i++) {
+ WindowedValue<KV<K, InputT>> nextValue = elements.get(i);
+ IntervalWindow nextWindow =
+ (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
+ if (currentWindow.intersects(nextWindow)) {
+ // we continue
+ currentWindow = currentWindow.span(nextWindow);
+ } else {
+ // retrofit the merged window to all windows up to "currentStart"
+ for (int j = i - 1; j >= currentStart; j--) {
+ WindowedValue<KV<K, InputT>> value = elements.get(j);
+ elements.set(
+ j,
+ WindowedValue.of(
+ value.getValue(), value.getTimestamp(), currentWindow, value.getPane()));
+ }
+ currentStart = i;
+ currentWindow = nextWindow;
+ }
+ }
+ if (currentStart < elements.size() - 1) {
+ // we have to retrofit the last batch
+ for (int j = elements.size() - 1; j >= currentStart; j--) {
+ WindowedValue<KV<K, InputT>> value = elements.get(j);
+ elements.set(
+ j,
+ WindowedValue.of(
+ value.getValue(), value.getTimestamp(), currentWindow, value.getPane()));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
new file mode 100644
index 0000000..84b3adc
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.PerKeyCombineFnRunner;
+import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.CombineFnBase;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+/**
+ * Special version of {@link FlinkReduceFunction} that supports merging windows. This
+ * assumes that the windows are {@link IntervalWindow IntervalWindows} and exhibits the
+ * same behaviour as {@code MergeOverlappingIntervalWindows}.
+ */
+public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWindow>
+ extends FlinkReduceFunction<K, AccumT, OutputT, W> {
+
+ public FlinkMergingReduceFunction(
+ CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> keyedCombineFn,
+ WindowingStrategy<?, W> windowingStrategy,
+ Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+ PipelineOptions pipelineOptions) {
+ super(keyedCombineFn, windowingStrategy, sideInputs, pipelineOptions);
+ }
+
+ @Override
+ public void reduce(
+ Iterable<WindowedValue<KV<K, AccumT>>> elements,
+ Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
+
+ PipelineOptions options = serializedOptions.getPipelineOptions();
+
+ FlinkSideInputReader sideInputReader =
+ new FlinkSideInputReader(sideInputs, getRuntimeContext());
+
+ PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
+ PerKeyCombineFnRunners.create(combineFn);
+
+ @SuppressWarnings("unchecked")
+ OutputTimeFn<? super BoundedWindow> outputTimeFn =
+ (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+
+ // get all elements so that we can sort them, has to fit into
+ // memory
+ // this seems very unprudent, but correct, for now
+ ArrayList<WindowedValue<KV<K, AccumT>>> sortedInput = Lists.newArrayList();
+ for (WindowedValue<KV<K, AccumT>> inputValue : elements) {
+ for (WindowedValue<KV<K, AccumT>> exploded : inputValue.explodeWindows()) {
+ sortedInput.add(exploded);
+ }
+ }
+ Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, AccumT>>>() {
+ @Override
+ public int compare(
+ WindowedValue<KV<K, AccumT>> o1,
+ WindowedValue<KV<K, AccumT>> o2) {
+ return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
+ .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
+ }
+ });
+
+ // merge windows, we have to do it in an extra pre-processing step and
+ // can't do it as we go since the window of early elements would not
+ // be correct when calling the CombineFn
+ mergeWindow(sortedInput);
+
+ // iterate over the elements that are sorted by window timestamp
+ final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedInput.iterator();
+
+ // get the first accumulator
+ WindowedValue<KV<K, AccumT>> currentValue = iterator.next();
+ K key = currentValue.getValue().getKey();
+ IntervalWindow currentWindow =
+ (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
+ AccumT accumulator = currentValue.getValue().getValue();
+
+ // we use this to keep track of the timestamps assigned by the OutputTimeFn,
+ // in FlinkPartialReduceFunction we already merge the timestamps assigned
+ // to individual elements, here we just merge them
+ List<Instant> windowTimestamps = new ArrayList<>();
+ windowTimestamps.add(currentValue.getTimestamp());
+
+ while (iterator.hasNext()) {
+ WindowedValue<KV<K, AccumT>> nextValue = iterator.next();
+ IntervalWindow nextWindow =
+ (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
+
+ if (nextWindow.equals(currentWindow)) {
+ // continue accumulating and merge windows
+
+ accumulator = combineFnRunner.mergeAccumulators(
+ key, ImmutableList.of(accumulator, nextValue.getValue().getValue()),
+ options, sideInputReader, currentValue.getWindows());
+
+ windowTimestamps.add(nextValue.getTimestamp());
+ } else {
+ out.collect(
+ WindowedValue.of(
+ KV.of(key, combineFnRunner.extractOutput(key, accumulator,
+ options, sideInputReader, currentValue.getWindows())),
+ outputTimeFn.merge(currentWindow, windowTimestamps),
+ currentWindow,
+ PaneInfo.NO_FIRING));
+
+ windowTimestamps.clear();
+
+ currentWindow = nextWindow;
+ currentValue = nextValue;
+ accumulator = nextValue.getValue().getValue();
+ windowTimestamps.add(nextValue.getTimestamp());
+ }
+ }
+
+ // emit the final accumulator
+ out.collect(
+ WindowedValue.of(
+ KV.of(key, combineFnRunner.extractOutput(key, accumulator,
+ options, sideInputReader, currentValue.getWindows())),
+ outputTimeFn.merge(currentWindow, windowTimestamps),
+ currentWindow,
+ PaneInfo.NO_FIRING));
+ }
+
+ /**
+ * Merge windows. This assumes that the list of elements is sorted by window-end timestamp.
+ * This replaces windows in the input list.
+ */
+ private void mergeWindow(List<WindowedValue<KV<K, AccumT>>> elements) {
+ int currentStart = 0;
+ IntervalWindow currentWindow =
+ (IntervalWindow) Iterables.getOnlyElement(elements.get(0).getWindows());
+
+ for (int i = 1; i < elements.size(); i++) {
+ WindowedValue<KV<K, AccumT>> nextValue = elements.get(i);
+ IntervalWindow nextWindow =
+ (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
+ if (currentWindow.intersects(nextWindow)) {
+ // we continue
+ currentWindow = currentWindow.span(nextWindow);
+ } else {
+ // retrofit the merged window to all windows up to "currentStart"
+ for (int j = i - 1; j >= currentStart; j--) {
+ WindowedValue<KV<K, AccumT>> value = elements.get(j);
+ elements.set(
+ j,
+ WindowedValue.of(
+ value.getValue(), value.getTimestamp(), currentWindow, value.getPane()));
+ }
+ currentStart = i;
+ currentWindow = nextWindow;
+ }
+ }
+ if (currentStart < elements.size() - 1) {
+ // we have to retrofit the last batch
+ for (int j = elements.size() - 1; j >= currentStart; j--) {
+ WindowedValue<KV<K, AccumT>> value = elements.get(j);
+ elements.set(
+ j,
+ WindowedValue.of(
+ value.getValue(), value.getTimestamp(), currentWindow, value.getPane()));
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java
new file mode 100644
index 0000000..9071cc5
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * A {@link FlatMapFunction} function that filters out those elements that don't belong in this
+ * output. We need this to implement MultiOutput ParDo functions in combination with
+ * {@link FlinkDoFnFunction}.
+ */
+public class FlinkMultiOutputPruningFunction<T>
+ implements FlatMapFunction<WindowedValue<RawUnionValue>, WindowedValue<T>> {
+
+ private final int ourOutputTag;
+
+ public FlinkMultiOutputPruningFunction(int ourOutputTag) {
+ this.ourOutputTag = ourOutputTag;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void flatMap(
+ WindowedValue<RawUnionValue> windowedValue,
+ Collector<WindowedValue<T>> collector) throws Exception {
+ int unionTag = windowedValue.getValue().getUnionTag();
+ if (unionTag == ourOutputTag) {
+ collector.collect(
+ (WindowedValue<T>) windowedValue.withValue(windowedValue.getValue().getValue()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
new file mode 100644
index 0000000..847a00a
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import java.io.IOException;
+import org.apache.beam.runners.core.ExecutionContext.StepContext;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * A {@link StepContext} for Flink Batch Runner execution.
+ */
+public class FlinkNoOpStepContext implements StepContext {
+
+ @Override
+ public String getStepName() {
+ return null;
+ }
+
+ @Override
+ public String getTransformName() {
+ return null;
+ }
+
+ @Override
+ public void noteOutput(WindowedValue<?> output) {
+
+ }
+
+ @Override
+ public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {
+
+ }
+
+ @Override
+ public <T, W extends BoundedWindow> void writePCollectionViewData(
+ TupleTag<?> tag,
+ Iterable<WindowedValue<T>> data,
+ Coder<Iterable<WindowedValue<T>>> dataCoder,
+ W window,
+ Coder<W> windowCoder) throws IOException {
+ }
+
+ @Override
+ public StateInternals<?> stateInternals() {
+ return null;
+ }
+
+ @Override
+ public TimerInternals timerInternals() {
+ return null;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
new file mode 100644
index 0000000..1d1ff9f
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.beam.runners.core.PerKeyCombineFnRunner;
+import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.CombineFnBase;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.flink.api.common.functions.RichGroupCombineFunction;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+/**
+ * This is is the first step for executing a {@link org.apache.beam.sdk.transforms.Combine.PerKey}
+ * on Flink. The second part is {@link FlinkReduceFunction}. This function performs a local
+ * combine step before shuffling while the latter does the final combination after a shuffle.
+ *
+ * <p>The input to {@link #combine(Iterable, Collector)} are elements of the same key but
+ * for different windows. We have to ensure that we only combine elements of matching
+ * windows.
+ */
+public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWindow>
+ extends RichGroupCombineFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, AccumT>>> {
+
+ protected final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn;
+
+ protected final WindowingStrategy<?, W> windowingStrategy;
+
+ protected final SerializedPipelineOptions serializedOptions;
+
+ protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
+
+ public FlinkPartialReduceFunction(
+ CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn,
+ WindowingStrategy<?, W> windowingStrategy,
+ Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+ PipelineOptions pipelineOptions) {
+
+ this.combineFn = combineFn;
+ this.windowingStrategy = windowingStrategy;
+ this.sideInputs = sideInputs;
+ this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+
+ }
+
+ @Override
+ public void combine(
+ Iterable<WindowedValue<KV<K, InputT>>> elements,
+ Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception {
+
+ PipelineOptions options = serializedOptions.getPipelineOptions();
+
+ FlinkSideInputReader sideInputReader =
+ new FlinkSideInputReader(sideInputs, getRuntimeContext());
+
+ PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
+ PerKeyCombineFnRunners.create(combineFn);
+
+ @SuppressWarnings("unchecked")
+ OutputTimeFn<? super BoundedWindow> outputTimeFn =
+ (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+
+ // get all elements so that we can sort them, has to fit into
+ // memory
+ // this seems very unprudent, but correct, for now
+ ArrayList<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
+ for (WindowedValue<KV<K, InputT>> inputValue : elements) {
+ for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) {
+ sortedInput.add(exploded);
+ }
+ }
+ Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() {
+ @Override
+ public int compare(
+ WindowedValue<KV<K, InputT>> o1,
+ WindowedValue<KV<K, InputT>> o2) {
+ return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
+ .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
+ }
+ });
+
+ // iterate over the elements that are sorted by window timestamp
+ //
+ final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator();
+
+ // create accumulator using the first elements key
+ WindowedValue<KV<K, InputT>> currentValue = iterator.next();
+ K key = currentValue.getValue().getKey();
+ BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null);
+ InputT firstValue = currentValue.getValue().getValue();
+ AccumT accumulator = combineFnRunner.createAccumulator(key,
+ options, sideInputReader, currentValue.getWindows());
+ accumulator = combineFnRunner.addInput(key, accumulator, firstValue,
+ options, sideInputReader, currentValue.getWindows());
+
+ // we use this to keep track of the timestamps assigned by the OutputTimeFn
+ Instant windowTimestamp =
+ outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow);
+
+ while (iterator.hasNext()) {
+ WindowedValue<KV<K, InputT>> nextValue = iterator.next();
+ BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows());
+
+ if (nextWindow.equals(currentWindow)) {
+ // continue accumulating
+ InputT value = nextValue.getValue().getValue();
+ accumulator = combineFnRunner.addInput(key, accumulator, value,
+ options, sideInputReader, currentValue.getWindows());
+
+ windowTimestamp = outputTimeFn.combine(
+ windowTimestamp,
+ outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
+
+ } else {
+ // emit the value that we currently have
+ out.collect(
+ WindowedValue.of(
+ KV.of(key, accumulator),
+ windowTimestamp,
+ currentWindow,
+ PaneInfo.NO_FIRING));
+
+ currentWindow = nextWindow;
+ currentValue = nextValue;
+ InputT value = nextValue.getValue().getValue();
+ accumulator = combineFnRunner.createAccumulator(key,
+ options, sideInputReader, currentValue.getWindows());
+ accumulator = combineFnRunner.addInput(key, accumulator, value,
+ options, sideInputReader, currentValue.getWindows());
+ windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
+ }
+ }
+
+ // emit the final accumulator
+ out.collect(
+ WindowedValue.of(
+ KV.of(key, accumulator),
+ windowTimestamp,
+ currentWindow,
+ PaneInfo.NO_FIRING));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
new file mode 100644
index 0000000..3e4f742
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.PerKeyCombineFnRunner;
+import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.CombineFnBase;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+/**
+ * This is the second part for executing a {@link org.apache.beam.sdk.transforms.Combine.PerKey}
+ * on Flink, the second part is {@link FlinkReduceFunction}. This function performs the final
+ * combination of the pre-combined values after a shuffle.
+ *
+ * <p>The input to {@link #reduce(Iterable, Collector)} are elements of the same key but
+ * for different windows. We have to ensure that we only combine elements of matching
+ * windows.
+ */
+public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
+ extends RichGroupReduceFunction<WindowedValue<KV<K, AccumT>>, WindowedValue<KV<K, OutputT>>> {
+
+ protected final CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> combineFn;
+
+ protected final WindowingStrategy<?, W> windowingStrategy;
+
+ protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
+
+ protected final SerializedPipelineOptions serializedOptions;
+
+ public FlinkReduceFunction(
+ CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> keyedCombineFn,
+ WindowingStrategy<?, W> windowingStrategy,
+ Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+ PipelineOptions pipelineOptions) {
+
+ this.combineFn = keyedCombineFn;
+
+ this.windowingStrategy = windowingStrategy;
+ this.sideInputs = sideInputs;
+
+ this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+
+ }
+
+ @Override
+ public void reduce(
+ Iterable<WindowedValue<KV<K, AccumT>>> elements,
+ Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
+
+ PipelineOptions options = serializedOptions.getPipelineOptions();
+
+ FlinkSideInputReader sideInputReader =
+ new FlinkSideInputReader(sideInputs, getRuntimeContext());
+
+ PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
+ PerKeyCombineFnRunners.create(combineFn);
+
+ @SuppressWarnings("unchecked")
+ OutputTimeFn<? super BoundedWindow> outputTimeFn =
+ (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+
+
+ // get all elements so that we can sort them, has to fit into
+ // memory
+ // this seems very unprudent, but correct, for now
+ ArrayList<WindowedValue<KV<K, AccumT>>> sortedInput = Lists.newArrayList();
+ for (WindowedValue<KV<K, AccumT>> inputValue: elements) {
+ for (WindowedValue<KV<K, AccumT>> exploded: inputValue.explodeWindows()) {
+ sortedInput.add(exploded);
+ }
+ }
+ Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, AccumT>>>() {
+ @Override
+ public int compare(
+ WindowedValue<KV<K, AccumT>> o1,
+ WindowedValue<KV<K, AccumT>> o2) {
+ return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
+ .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
+ }
+ });
+
+ // iterate over the elements that are sorted by window timestamp
+ //
+ final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedInput.iterator();
+
+ // get the first accumulator
+ WindowedValue<KV<K, AccumT>> currentValue = iterator.next();
+ K key = currentValue.getValue().getKey();
+ BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null);
+ AccumT accumulator = currentValue.getValue().getValue();
+
+ // we use this to keep track of the timestamps assigned by the OutputTimeFn,
+ // in FlinkPartialReduceFunction we already merge the timestamps assigned
+ // to individual elements, here we just merge them
+ List<Instant> windowTimestamps = new ArrayList<>();
+ windowTimestamps.add(currentValue.getTimestamp());
+
+ while (iterator.hasNext()) {
+ WindowedValue<KV<K, AccumT>> nextValue = iterator.next();
+ BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows());
+
+ if (nextWindow.equals(currentWindow)) {
+ // continue accumulating
+ accumulator = combineFnRunner.mergeAccumulators(
+ key, ImmutableList.of(accumulator, nextValue.getValue().getValue()),
+ options, sideInputReader, currentValue.getWindows());
+
+ windowTimestamps.add(nextValue.getTimestamp());
+ } else {
+ // emit the value that we currently have
+ out.collect(
+ WindowedValue.of(
+ KV.of(key, combineFnRunner.extractOutput(key, accumulator,
+ options, sideInputReader, currentValue.getWindows())),
+ outputTimeFn.merge(currentWindow, windowTimestamps),
+ currentWindow,
+ PaneInfo.NO_FIRING));
+
+ windowTimestamps.clear();
+
+ currentWindow = nextWindow;
+ currentValue = nextValue;
+ accumulator = nextValue.getValue().getValue();
+ windowTimestamps.add(nextValue.getTimestamp());
+ }
+
+ }
+
+ // emit the final accumulator
+ out.collect(
+ WindowedValue.of(
+ KV.of(key, combineFnRunner.extractOutput(key, accumulator,
+ options, sideInputReader, currentValue.getWindows())),
+ outputTimeFn.merge(currentWindow, windowTimestamps),
+ currentWindow,
+ PaneInfo.NO_FIRING));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
new file mode 100644
index 0000000..c317182
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/**
+ * A {@link SideInputReader} for the Flink Batch Runner.
+ */
+public class FlinkSideInputReader implements SideInputReader {
+
+ private final Map<TupleTag<?>, WindowingStrategy<?, ?>> sideInputs;
+
+ private RuntimeContext runtimeContext;
+
+ public FlinkSideInputReader(Map<PCollectionView<?>, WindowingStrategy<?, ?>> indexByView,
+ RuntimeContext runtimeContext) {
+ sideInputs = new HashMap<>();
+ for (Map.Entry<PCollectionView<?>, WindowingStrategy<?, ?>> entry : indexByView.entrySet()) {
+ sideInputs.put(entry.getKey().getTagInternal(), entry.getValue());
+ }
+ this.runtimeContext = runtimeContext;
+ }
+
+ @Nullable
+ @Override
+ public <T> T get(PCollectionView<T> view, BoundedWindow window) {
+ checkNotNull(view, "View passed to sideInput cannot be null");
+ TupleTag<Iterable<WindowedValue<?>>> tag = view.getTagInternal();
+ checkNotNull(
+ sideInputs.get(tag),
+ "Side input for " + view + " not available.");
+
+ Map<BoundedWindow, T> sideInputs =
+ runtimeContext.getBroadcastVariableWithInitializer(
+ tag.getId(), new SideInputInitializer<>(view));
+ T result = sideInputs.get(window);
+ if (result == null) {
+ result = view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList());
+ }
+ return result;
+ }
+
+ @Override
+ public <T> boolean contains(PCollectionView<T> view) {
+ return sideInputs.containsKey(view.getTagInternal());
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return sideInputs.isEmpty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
new file mode 100644
index 0000000..c8193d2
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.InMemoryStateInternals;
+import org.apache.beam.runners.core.InMemoryTimerInternals;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+/**
+ * A {@link RichGroupReduceFunction} for stateful {@link ParDo} in Flink Batch Runner.
+ */
+public class FlinkStatefulDoFnFunction<K, V, OutputT>
+ extends RichGroupReduceFunction<WindowedValue<KV<K, V>>, WindowedValue<OutputT>> {
+
+ private final DoFn<KV<K, V>, OutputT> dofn;
+ private final WindowingStrategy<?, ?> windowingStrategy;
+ private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
+ private final SerializedPipelineOptions serializedOptions;
+ private final Map<TupleTag<?>, Integer> outputMap;
+ private final TupleTag<OutputT> mainOutputTag;
+ private transient DoFnInvoker doFnInvoker;
+
+ public FlinkStatefulDoFnFunction(
+ DoFn<KV<K, V>, OutputT> dofn,
+ WindowingStrategy<?, ?> windowingStrategy,
+ Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+ PipelineOptions pipelineOptions,
+ Map<TupleTag<?>, Integer> outputMap,
+ TupleTag<OutputT> mainOutputTag) {
+
+ this.dofn = dofn;
+ this.windowingStrategy = windowingStrategy;
+ this.sideInputs = sideInputs;
+ this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+ this.outputMap = outputMap;
+ this.mainOutputTag = mainOutputTag;
+ }
+
+ @Override
+ public void reduce(
+ Iterable<WindowedValue<KV<K, V>>> values,
+ Collector<WindowedValue<OutputT>> out) throws Exception {
+ RuntimeContext runtimeContext = getRuntimeContext();
+
+ DoFnRunners.OutputManager outputManager;
+ if (outputMap == null) {
+ outputManager = new FlinkDoFnFunction.DoFnOutputManager(out);
+ } else {
+ // it has some additional Outputs
+ outputManager =
+ new FlinkDoFnFunction.MultiDoFnOutputManager((Collector) out, outputMap);
+ }
+
+ final Iterator<WindowedValue<KV<K, V>>> iterator = values.iterator();
+
+ // get the first value, we need this for initializing the state internals with the key.
+ // we are guaranteed to have a first value, otherwise reduce() would not have been called.
+ WindowedValue<KV<K, V>> currentValue = iterator.next();
+ final K key = currentValue.getValue().getKey();
+
+ final InMemoryStateInternals<K> stateInternals = InMemoryStateInternals.forKey(key);
+
+ // Used with Batch, we know that all the data is available for this key. We can't use the
+ // timer manager from the context because it doesn't exist. So we create one and advance
+ // time to the end after processing all elements.
+ final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
+ timerInternals.advanceProcessingTime(Instant.now());
+ timerInternals.advanceSynchronizedProcessingTime(Instant.now());
+
+ DoFnRunner<KV<K, V>, OutputT> doFnRunner = DoFnRunners.simpleRunner(
+ serializedOptions.getPipelineOptions(), dofn,
+ new FlinkSideInputReader(sideInputs, runtimeContext),
+ outputManager,
+ mainOutputTag,
+ // see SimpleDoFnRunner, just use it to limit number of additional outputs
+ Collections.<TupleTag<?>>emptyList(),
+ new FlinkNoOpStepContext() {
+ @Override
+ public StateInternals<?> stateInternals() {
+ return stateInternals;
+ }
+ @Override
+ public TimerInternals timerInternals() {
+ return timerInternals;
+ }
+ },
+ new FlinkAggregatorFactory(runtimeContext),
+ windowingStrategy);
+
+ doFnRunner.startBundle();
+
+ doFnRunner.processElement(currentValue);
+ while (iterator.hasNext()) {
+ currentValue = iterator.next();
+ doFnRunner.processElement(currentValue);
+ }
+
+ // Finish any pending windows by advancing the input watermark to infinity.
+ timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+ // Finally, advance the processing time to infinity to fire any timers.
+ timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+ fireEligibleTimers(timerInternals, doFnRunner);
+
+ doFnRunner.finishBundle();
+ }
+
+ private void fireEligibleTimers(
+ InMemoryTimerInternals timerInternals, DoFnRunner<KV<K, V>, OutputT> runner)
+ throws Exception {
+
+ while (true) {
+
+ TimerInternals.TimerData timer;
+ boolean hasFired = false;
+
+ while ((timer = timerInternals.removeNextEventTimer()) != null) {
+ hasFired = true;
+ fireTimer(timer, runner);
+ }
+ while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
+ hasFired = true;
+ fireTimer(timer, runner);
+ }
+ while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
+ hasFired = true;
+ fireTimer(timer, runner);
+ }
+ if (!hasFired) {
+ break;
+ }
+ }
+ }
+
+ private void fireTimer(
+ TimerInternals.TimerData timer, DoFnRunner<KV<K, V>, OutputT> doFnRunner) {
+ StateNamespace namespace = timer.getNamespace();
+ checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
+ BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
+ doFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ doFnInvoker = DoFnInvokers.invokerFor(dofn);
+ doFnInvoker.invokeSetup();
+ }
+
+ @Override
+ public void close() throws Exception {
+ doFnInvoker.invokeTeardown();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
new file mode 100644
index 0000000..12222b4
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+
+/**
+ * {@link BroadcastVariableInitializer} that initializes the broadcast input as a {@code Map}
+ * from window to side input.
+ */
+public class SideInputInitializer<ElemT, ViewT, W extends BoundedWindow>
+ implements BroadcastVariableInitializer<WindowedValue<ElemT>, Map<BoundedWindow, ViewT>> {
+
+ PCollectionView<ViewT> view;
+
+ public SideInputInitializer(PCollectionView<ViewT> view) {
+ this.view = view;
+ }
+
+ @Override
+ public Map<BoundedWindow, ViewT> initializeBroadcastVariable(
+ Iterable<WindowedValue<ElemT>> inputValues) {
+
+ // first partition into windows
+ Map<BoundedWindow, List<WindowedValue<ElemT>>> partitionedElements = new HashMap<>();
+ for (WindowedValue<ElemT> value: inputValues) {
+ for (BoundedWindow window: value.getWindows()) {
+ List<WindowedValue<ElemT>> windowedValues = partitionedElements.get(window);
+ if (windowedValues == null) {
+ windowedValues = new ArrayList<>();
+ partitionedElements.put(window, windowedValues);
+ }
+ windowedValues.add(value);
+ }
+ }
+
+ Map<BoundedWindow, ViewT> resultMap = new HashMap<>();
+
+ for (Map.Entry<BoundedWindow, List<WindowedValue<ElemT>>> elements:
+ partitionedElements.entrySet()) {
+
+ @SuppressWarnings("unchecked")
+ Iterable<WindowedValue<?>> elementsIterable =
+ (List<WindowedValue<?>>) (List<?>) elements.getValue();
+
+ resultMap.put(elements.getKey(), view.getViewFn().apply(elementsIterable));
+ }
+
+ return resultMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java
new file mode 100644
index 0000000..9f11212
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation.functions;
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/package-info.java
new file mode 100644
index 0000000..af4b354
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation;
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
new file mode 100644
index 0000000..9b449aa
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.types;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * Flink {@link org.apache.flink.api.common.typeinfo.TypeInformation} for
+ * Dataflow {@link org.apache.beam.sdk.coders.Coder}s.
+ */
+public class CoderTypeInformation<T> extends TypeInformation<T> implements AtomicType<T> {
+
+ private final Coder<T> coder;
+
+ public CoderTypeInformation(Coder<T> coder) {
+ checkNotNull(coder);
+ this.coder = coder;
+ }
+
+ public Coder<T> getCoder() {
+ return coder;
+ }
+
+ @Override
+ public boolean isBasicType() {
+ return false;
+ }
+
+ @Override
+ public boolean isTupleType() {
+ return false;
+ }
+
+ @Override
+ public int getArity() {
+ return 1;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Class<T> getTypeClass() {
+ // We don't have the Class, so we have to pass null here. What a shame...
+ return (Class<T>) Object.class;
+ }
+
+ @Override
+ public boolean isKeyType() {
+ return true;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+ return new CoderTypeSerializer<>(coder);
+ }
+
+ @Override
+ public int getTotalFields() {
+ return 2;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ CoderTypeInformation that = (CoderTypeInformation) o;
+
+ return coder.equals(that.coder);
+
+ }
+
+ @Override
+ public int hashCode() {
+ return coder.hashCode();
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof CoderTypeInformation;
+ }
+
+ @Override
+ public String toString() {
+ return "CoderTypeInformation{coder=" + coder + '}';
+ }
+
+ @Override
+ public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig
+ executionConfig) {
+ throw new UnsupportedOperationException(
+ "Non-encoded values cannot be compared directly.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
new file mode 100644
index 0000000..e210ed9
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.types;
+
+import java.io.EOFException;
+import java.io.IOException;
+import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
+import org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+/**
+ * Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for
+ * Dataflow {@link org.apache.beam.sdk.coders.Coder Coders}.
+ */
+public class CoderTypeSerializer<T> extends TypeSerializer<T> {
+
+ private Coder<T> coder;
+
+ public CoderTypeSerializer(Coder<T> coder) {
+ this.coder = coder;
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public CoderTypeSerializer<T> duplicate() {
+ return new CoderTypeSerializer<>(coder);
+ }
+
+ @Override
+ public T createInstance() {
+ return null;
+ }
+
+ @Override
+ public T copy(T t) {
+ try {
+ return CoderUtils.clone(coder, t);
+ } catch (CoderException e) {
+ throw new RuntimeException("Could not clone.", e);
+ }
+ }
+
+ @Override
+ public T copy(T t, T reuse) {
+ return copy(t);
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(T t, DataOutputView dataOutputView) throws IOException {
+ DataOutputViewWrapper outputWrapper = new DataOutputViewWrapper(dataOutputView);
+ coder.encode(t, outputWrapper, Coder.Context.NESTED);
+ }
+
+ @Override
+ public T deserialize(DataInputView dataInputView) throws IOException {
+ try {
+ DataInputViewWrapper inputWrapper = new DataInputViewWrapper(dataInputView);
+ return coder.decode(inputWrapper, Coder.Context.NESTED);
+ } catch (CoderException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof EOFException) {
+ throw (EOFException) cause;
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public T deserialize(T t, DataInputView dataInputView) throws IOException {
+ return deserialize(dataInputView);
+ }
+
+ @Override
+ public void copy(
+ DataInputView dataInputView,
+ DataOutputView dataOutputView) throws IOException {
+ serialize(deserialize(dataInputView), dataOutputView);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ CoderTypeSerializer that = (CoderTypeSerializer) o;
+ return coder.equals(that.coder);
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof CoderTypeSerializer;
+ }
+
+ @Override
+ public int hashCode() {
+ return coder.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java
new file mode 100644
index 0000000..667ef45
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.types;
+
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * Flink {@link org.apache.flink.api.common.typeutils.TypeComparator} for Beam values that have
+ * been encoded to byte data by a {@link Coder}.
+ */
+public class EncodedValueComparator extends TypeComparator<byte[]> {
+
+ /** For storing the Reference in encoded form. */
+ private transient byte[] encodedReferenceKey;
+
+ private final boolean ascending;
+
+ public EncodedValueComparator(boolean ascending) {
+ this.ascending = ascending;
+ }
+
+ @Override
+ public int hash(byte[] record) {
+ return Arrays.hashCode(record);
+ }
+
+ @Override
+ public void setReference(byte[] toCompare) {
+ this.encodedReferenceKey = toCompare;
+ }
+
+ @Override
+ public boolean equalToReference(byte[] candidate) {
+ if (encodedReferenceKey.length != candidate.length) {
+ return false;
+ }
+ int len = candidate.length;
+ for (int i = 0; i < len; i++) {
+ if (encodedReferenceKey[i] != candidate[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public int compareToReference(TypeComparator<byte[]> other) {
+ // VERY IMPORTANT: compareToReference does not behave like Comparable.compare
+ // the meaning of the return value is inverted.
+
+ EncodedValueComparator otherEncodedValueComparator = (EncodedValueComparator) other;
+
+ int len = Math.min(
+ encodedReferenceKey.length,
+ otherEncodedValueComparator.encodedReferenceKey.length);
+
+ for (int i = 0; i < len; i++) {
+ byte b1 = encodedReferenceKey[i];
+ byte b2 = otherEncodedValueComparator.encodedReferenceKey[i];
+ int result = (b1 < b2 ? -1 : (b1 == b2 ? 0 : 1));
+ if (result != 0) {
+ return ascending ? -result : result;
+ }
+ }
+ int result =
+ encodedReferenceKey.length - otherEncodedValueComparator.encodedReferenceKey.length;
+ return ascending ? -result : result;
+ }
+
+
+ @Override
+ public int compare(byte[] first, byte[] second) {
+ int len = Math.min(first.length, second.length);
+ for (int i = 0; i < len; i++) {
+ byte b1 = first[i];
+ byte b2 = second[i];
+ int result = (b1 < b2 ? -1 : (b1 == b2 ? 0 : 1));
+ if (result != 0) {
+ return ascending ? result : -result;
+ }
+ }
+ int result = first.length - second.length;
+ return ascending ? result : -result;
+ }
+
+ @Override
+ public int compareSerialized(
+ DataInputView firstSource,
+ DataInputView secondSource) throws IOException {
+ int lengthFirst = firstSource.readInt();
+ int lengthSecond = secondSource.readInt();
+
+ int len = Math.min(lengthFirst, lengthSecond);
+ for (int i = 0; i < len; i++) {
+ byte b1 = firstSource.readByte();
+ byte b2 = secondSource.readByte();
+ int result = (b1 < b2 ? -1 : (b1 == b2 ? 0 : 1));
+ if (result != 0) {
+ return ascending ? result : -result;
+ }
+ }
+
+ int result = lengthFirst - lengthSecond;
+ return ascending ? result : -result;
+ }
+
+
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ // disabled because this seems to not work with some coders,
+ // such as the AvroCoder
+ return false;
+ }
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ return false;
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return true;
+ }
+
+ @Override
+ public void putNormalizedKey(byte[] record, MemorySegment target, int offset, int numBytes) {
+ final int limit = offset + numBytes;
+
+ target.put(offset, record, 0, Math.min(numBytes, record.length));
+
+ offset += record.length;
+
+ while (offset < limit) {
+ target.put(offset++, (byte) 0);
+ }
+ }
+
+ @Override
+ public void writeWithKeyNormalization(byte[] record, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public byte[] readWithKeyDenormalization(byte[] reuse, DataInputView source) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean invertNormalizedKey() {
+ return !ascending;
+ }
+
+ @Override
+ public TypeComparator<byte[]> duplicate() {
+ return new EncodedValueComparator(ascending);
+ }
+
+ @Override
+ public int extractKeys(Object record, Object[] target, int index) {
+ target[index] = record;
+ return 1;
+ }
+
+ @Override
+ public TypeComparator[] getFlatComparators() {
+ return new TypeComparator[] { this.duplicate() };
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
new file mode 100644
index 0000000..41db61e
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.types;
+
+import java.io.IOException;
+
+import org.apache.beam.sdk.coders.Coder;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+/**
+ * {@link TypeSerializer} for values that were encoded using a {@link Coder}.
+ */
+public final class EncodedValueSerializer extends TypeSerializer<byte[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final byte[] EMPTY = new byte[0];
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public byte[] createInstance() {
+ return EMPTY;
+ }
+
+ @Override
+ public byte[] copy(byte[] from) {
+ return from;
+ }
+
+ @Override
+ public byte[] copy(byte[] from, byte[] reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+
+ @Override
+ public void serialize(byte[] record, DataOutputView target) throws IOException {
+ if (record == null) {
+ throw new IllegalArgumentException("The record must not be null.");
+ }
+
+ final int len = record.length;
+ target.writeInt(len);
+ target.write(record);
+ }
+
+ @Override
+ public byte[] deserialize(DataInputView source) throws IOException {
+ final int len = source.readInt();
+ byte[] result = new byte[len];
+ source.readFully(result);
+ return result;
+ }
+
+ @Override
+ public byte[] deserialize(byte[] reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ final int len = source.readInt();
+ target.writeInt(len);
+ target.write(source, len);
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof EncodedValueSerializer;
+ }
+
+ @Override
+ public int hashCode() {
+ return this.getClass().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof EncodedValueSerializer;
+ }
+
+ @Override
+ public TypeSerializer<byte[]> duplicate() {
+ return this;
+ }
+}