You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/24 19:52:49 UTC
[11/17] incubator-beam git commit: [BEAM-102] Add Side Inputs in
Flink Streaming Runner
[BEAM-102] Add Side Inputs in Flink Streaming Runner
This adds a generic SideInputHandler in runners-core that is only used
by the Flink runner right now but can be used by other runner
implementations.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dfbdc6c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dfbdc6c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dfbdc6c2
Branch: refs/heads/master
Commit: dfbdc6c2bbef5e749bfc1800f97d21377f0c713d
Parents: ff34f9e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Jul 11 14:08:35 2016 +0200
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Aug 24 12:46:24 2016 -0700
----------------------------------------------------------------------
.../beam/runners/core/SideInputHandler.java | 240 ++++++
.../beam/runners/core/SideInputHandlerTest.java | 222 ++++++
.../apache/beam/runners/flink/FlinkRunner.java | 386 +++++++++-
.../beam/runners/flink/TestFlinkRunner.java | 4 +-
.../FlinkStreamingPipelineTranslator.java | 59 +-
.../FlinkStreamingTransformTranslators.java | 727 +++++++++++++------
.../translation/types/CoderTypeInformation.java | 4 +
.../wrappers/streaming/DoFnOperator.java | 282 ++++++-
.../wrappers/streaming/WindowDoFnOperator.java | 47 +-
.../streaming/io/BoundedSourceWrapper.java | 219 ++++++
.../io/FlinkStreamingCreateFunction.java | 56 --
.../flink/streaming/DoFnOperatorTest.java | 328 +++++++++
.../streaming/UnboundedSourceWrapperTest.java | 2 +-
.../beam/sdk/transforms/join/RawUnionValue.java | 25 +
14 files changed, 2270 insertions(+), 331 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
new file mode 100644
index 0000000..6550251
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SetCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+/**
+ * Generic side input handler that uses {@link StateInternals} to store all data. Both the actual
+ * side-input data and data about the windows for which we have side inputs available are stored
+ * using {@code StateInternals}.
+ *
+ * <p>The given {@code StateInternals} must not be scoped to an element key. The state
+ * must instead be scoped to one key group for which the side input is being managed.
+ *
+ * <p>This is useful for runners that transmit the side-input elements in band, as opposed
+ * to how Dataflow has an external service for managing side inputs.
+ *
+ * <p>Note: storing the available windows in an extra state is redundant for now but in the
+ * future we might want to know which windows we have available so that we can garbage collect
+ * side input data. For now, this will never clean up side-input data because we have no way
+ * of knowing when we reach the GC horizon.
+ */
+public class SideInputHandler implements ReadyCheckingSideInputReader {
+
+ /** The list of side inputs that we're handling. */
+ protected final Collection<PCollectionView<?>> sideInputs;
+
+ /** State internals that are scoped not to the key of a value but instead to one key group. */
+ private final StateInternals<Void> stateInternals;
+
+ /**
+ * A state tag for each side input that we handle. The state is used to track
+ * for which windows we have input available.
+ */
+ private final Map<
+ PCollectionView<?>,
+ StateTag<
+ Object,
+ AccumulatorCombiningState<
+ BoundedWindow,
+ Set<BoundedWindow>,
+ Set<BoundedWindow>>>> availableWindowsTags;
+
+ /**
+ * State tag for the actual contents of each side input per window.
+ */
+ private final Map<
+ PCollectionView<?>,
+ StateTag<Object, ValueState<Iterable<WindowedValue<?>>>>> sideInputContentsTags;
+
+ /**
+ * Creates a new {@code SideInputHandler} for the given side inputs that uses
+ * the given {@code StateInternals} to store side input data and side-input meta data.
+ */
+ public SideInputHandler(
+ Collection<PCollectionView<?>> sideInputs,
+ StateInternals<Void> stateInternals) {
+ this.sideInputs = sideInputs;
+ this.stateInternals = stateInternals;
+ this.availableWindowsTags = new HashMap<>();
+ this.sideInputContentsTags = new HashMap<>();
+
+ for (PCollectionView<?> sideInput: sideInputs) {
+
+ @SuppressWarnings("unchecked")
+ Coder<BoundedWindow> windowCoder =
+ (Coder<BoundedWindow>) sideInput
+ .getWindowingStrategyInternal()
+ .getWindowFn()
+ .windowCoder();
+
+ StateTag<
+ Object,
+ AccumulatorCombiningState<
+ BoundedWindow,
+ Set<BoundedWindow>,
+ Set<BoundedWindow>>> availableTag = StateTags.combiningValue(
+ "side-input-available-windows-" + sideInput.getTagInternal().getId(),
+ SetCoder.of(windowCoder),
+ new WindowSetCombineFn());
+
+ availableWindowsTags.put(sideInput, availableTag);
+
+ Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal();
+ StateTag<Object, ValueState<Iterable<WindowedValue<?>>>> stateTag =
+ StateTags.value("side-input-data-" + sideInput.getTagInternal().getId(), coder);
+ sideInputContentsTags.put(sideInput, stateTag);
+ }
+ }
+
+ /**
+ * Add the given value to the internal side-input store of the given side input. This
+ * might change the result of {@link #isReady(PCollectionView, BoundedWindow)} for that side
+ * input.
+ */
+ public void addSideInputValue(
+ PCollectionView<?> sideInput,
+ WindowedValue<Iterable<?>> value) {
+
+ @SuppressWarnings("unchecked")
+ Coder<BoundedWindow> windowCoder =
+ (Coder<BoundedWindow>) sideInput
+ .getWindowingStrategyInternal()
+ .getWindowFn()
+ .windowCoder();
+
+ // reify the WindowedValue
+ List<WindowedValue<?>> inputWithReifiedWindows = new ArrayList<>();
+ for (Object e: value.getValue()) {
+ inputWithReifiedWindows.add(value.withValue(e));
+ }
+
+ StateTag<Object, ValueState<Iterable<WindowedValue<?>>>> stateTag =
+ sideInputContentsTags.get(sideInput);
+
+ for (BoundedWindow window: value.getWindows()) {
+ stateInternals
+ .state(StateNamespaces.window(windowCoder, window), stateTag)
+ .write(inputWithReifiedWindows);
+
+ stateInternals
+ .state(StateNamespaces.global(), availableWindowsTags.get(sideInput))
+ .add(window);
+ }
+ }
+
+ @Nullable
+ @Override
+ public <T> T get(PCollectionView<T> sideInput, BoundedWindow window) {
+
+ if (!isReady(sideInput, window)) {
+ throw new IllegalStateException(
+ "Side input " + sideInput + " is not ready for window " + window);
+ }
+
+ @SuppressWarnings("unchecked")
+ Coder<BoundedWindow> windowCoder =
+ (Coder<BoundedWindow>) sideInput
+ .getWindowingStrategyInternal()
+ .getWindowFn()
+ .windowCoder();
+
+ StateTag<Object, ValueState<Iterable<WindowedValue<?>>>> stateTag =
+ sideInputContentsTags.get(sideInput);
+
+ ValueState<Iterable<WindowedValue<?>>> state =
+ stateInternals.state(StateNamespaces.window(windowCoder, window), stateTag);
+
+ Iterable<WindowedValue<?>> elements = state.read();
+
+ return sideInput.fromIterableInternal(elements);
+ }
+
+ @Override
+ public boolean isReady(PCollectionView<?> sideInput, BoundedWindow window) {
+ Set<BoundedWindow> readyWindows =
+ stateInternals.state(StateNamespaces.global(), availableWindowsTags.get(sideInput)).read();
+
+ boolean result = readyWindows != null && readyWindows.contains(window);
+ return result;
+ }
+
+ @Override
+ public <T> boolean contains(PCollectionView<T> view) {
+ return sideInputs.contains(view);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return sideInputs.isEmpty();
+ }
+
+ /**
+ * For keeping track of the windows for which we have available side input.
+ */
+ private static class WindowSetCombineFn
+ extends Combine.CombineFn<BoundedWindow, Set<BoundedWindow>, Set<BoundedWindow>> {
+
+ @Override
+ public Set<BoundedWindow> createAccumulator() {
+ return new HashSet<>();
+ }
+
+ @Override
+ public Set<BoundedWindow> addInput(Set<BoundedWindow> accumulator, BoundedWindow input) {
+ accumulator.add(input);
+ return accumulator;
+ }
+
+ @Override
+ public Set<BoundedWindow> mergeAccumulators(Iterable<Set<BoundedWindow>> accumulators) {
+ Set<BoundedWindow> result = new HashSet<>();
+ for (Set<BoundedWindow> acc: accumulators) {
+ result.addAll(acc);
+ }
+ return result;
+ }
+
+ @Override
+ public Set<BoundedWindow> extractOutput(Set<BoundedWindow> accumulator) {
+ return accumulator;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
new file mode 100644
index 0000000..641e25e
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.PCollectionViewTesting;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.InMemoryStateInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Unit tests for {@link SideInputHandler}.
+ */
+@RunWith(JUnit4.class)
+public class SideInputHandlerTest {
+
+ private static final long WINDOW_MSECS_1 = 100;
+ private static final long WINDOW_MSECS_2 = 500;
+
+ private WindowingStrategy<Object, IntervalWindow> windowingStrategy1 =
+ WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_1)));
+
+ private PCollectionView<Iterable<String>> view1 = PCollectionViewTesting.testingView(
+ new TupleTag<Iterable<WindowedValue<String>>>() {},
+ new PCollectionViewTesting.IdentityViewFn<String>(),
+ StringUtf8Coder.of(),
+ windowingStrategy1);
+
+ private WindowingStrategy<Object, IntervalWindow> windowingStrategy2 =
+ WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_2)));
+
+ private PCollectionView<Iterable<String>> view2 = PCollectionViewTesting.testingView(
+ new TupleTag<Iterable<WindowedValue<String>>>() {},
+ new PCollectionViewTesting.IdentityViewFn<String>(),
+ StringUtf8Coder.of(),
+ windowingStrategy2);
+
+ @Test
+ public void testIsEmpty() {
+ SideInputHandler sideInputHandler = new SideInputHandler(
+ ImmutableList.<PCollectionView<?>>of(view1),
+ InMemoryStateInternals.<Void>forKey(null));
+
+ assertFalse(sideInputHandler.isEmpty());
+
+ // create an empty handler
+ SideInputHandler emptySideInputHandler = new SideInputHandler(
+ ImmutableList.<PCollectionView<?>>of(),
+ InMemoryStateInternals.<Void>forKey(null));
+
+ assertTrue(emptySideInputHandler.isEmpty());
+ }
+
+ @Test
+ public void testContains() {
+ SideInputHandler sideInputHandler = new SideInputHandler(
+ ImmutableList.<PCollectionView<?>>of(view1),
+ InMemoryStateInternals.<Void>forKey(null));
+
+ assertTrue(sideInputHandler.contains(view1));
+ assertFalse(sideInputHandler.contains(view2));
+ }
+
+ @Test
+ public void testIsReady() {
+ SideInputHandler sideInputHandler = new SideInputHandler(
+ ImmutableList.<PCollectionView<?>>of(view1, view2),
+ InMemoryStateInternals.<Void>forKey(null));
+
+ IntervalWindow firstWindow =
+ new IntervalWindow(new Instant(0), new Instant(WINDOW_MSECS_1));
+
+ IntervalWindow secondWindow =
+ new IntervalWindow(new Instant(0), new Instant(WINDOW_MSECS_2));
+
+
+ // side input should not yet be ready
+ assertFalse(sideInputHandler.isReady(view1, firstWindow));
+
+ // add a value for view1
+ sideInputHandler.addSideInputValue(
+ view1,
+ valuesInWindow(ImmutableList.of("Hello"), new Instant(0), firstWindow));
+
+ // now side input should be ready
+ assertTrue(sideInputHandler.isReady(view1, firstWindow));
+
+ // second window input should still not be ready
+ assertFalse(sideInputHandler.isReady(view1, secondWindow));
+ }
+
+ @Test
+ public void testNewInputReplacesPreviousInput() {
+ // new input should completely replace old input
+ // the creation of the Iterable that has the side input
+ // contents happens upstream. this is also where
+ // accumulation/discarding is decided.
+
+ SideInputHandler sideInputHandler = new SideInputHandler(
+ ImmutableList.<PCollectionView<?>>of(view1),
+ InMemoryStateInternals.<Void>forKey(null));
+
+ IntervalWindow window =
+ new IntervalWindow(new Instant(0), new Instant(WINDOW_MSECS_1));
+
+ // add a first value for view1
+ sideInputHandler.addSideInputValue(
+ view1,
+ valuesInWindow(ImmutableList.of("Hello"), new Instant(0), window));
+
+ Assert.assertThat(sideInputHandler.get(view1, window), contains("Hello"));
+
+ // subsequent values should replace existing values
+ sideInputHandler.addSideInputValue(
+ view1,
+ valuesInWindow(ImmutableList.of("Ciao", "Buongiorno"), new Instant(0), window));
+
+ Assert.assertThat(sideInputHandler.get(view1, window), contains("Ciao", "Buongiorno"));
+ }
+
+ @Test
+ public void testMultipleWindows() {
+ SideInputHandler sideInputHandler = new SideInputHandler(
+ ImmutableList.<PCollectionView<?>>of(view1),
+ InMemoryStateInternals.<Void>forKey(null));
+
+ // two windows that we'll later use for adding elements/retrieving side input
+ IntervalWindow firstWindow =
+ new IntervalWindow(new Instant(0), new Instant(WINDOW_MSECS_1));
+ IntervalWindow secondWindow =
+ new IntervalWindow(new Instant(1000), new Instant(1000 + WINDOW_MSECS_2));
+
+ // add a first value for view1 in the first window
+ sideInputHandler.addSideInputValue(
+ view1,
+ valuesInWindow(ImmutableList.of("Hello"), new Instant(0), firstWindow));
+
+ Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+
+ // add something for second window of view1
+ sideInputHandler.addSideInputValue(
+ view1,
+ valuesInWindow(ImmutableList.of("Arrivederci"), new Instant(0), secondWindow));
+
+ Assert.assertThat(sideInputHandler.get(view1, secondWindow), contains("Arrivederci"));
+
+ // contents for first window should be unaffected
+ Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+ }
+
+ @Test
+ public void testMultipleSideInputs() {
+ SideInputHandler sideInputHandler = new SideInputHandler(
+ ImmutableList.<PCollectionView<?>>of(view1, view2),
+ InMemoryStateInternals.<Void>forKey(null));
+
+ // two windows that we'll later use for adding elements/retrieving side input
+ IntervalWindow firstWindow =
+ new IntervalWindow(new Instant(0), new Instant(WINDOW_MSECS_1));
+
+ // add value for view1 in the first window
+ sideInputHandler.addSideInputValue(
+ view1,
+ valuesInWindow(ImmutableList.of("Hello"), new Instant(0), firstWindow));
+
+ Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+
+ // view2 should not have any data
+ assertFalse(sideInputHandler.isReady(view2, firstWindow));
+
+ // also add some data for view2
+ sideInputHandler.addSideInputValue(
+ view2,
+ valuesInWindow(ImmutableList.of("Salut"), new Instant(0), firstWindow));
+
+ assertTrue(sideInputHandler.isReady(view2, firstWindow));
+ Assert.assertThat(sideInputHandler.get(view2, firstWindow), contains("Salut"));
+
+ // view1 should not be affected by that
+ Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private WindowedValue<Iterable<?>> valuesInWindow(
+ Iterable<?> values, Instant timestamp, BoundedWindow window) {
+ return (WindowedValue) WindowedValue.of(values, timestamp, window, PaneInfo.NO_FIRING);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 47c4877..b0e88b7 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -18,14 +18,28 @@
package org.apache.beam.runners.flink;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.PCollectionViews;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
import org.apache.flink.api.common.JobExecutionResult;
import org.slf4j.Logger;
@@ -36,6 +50,7 @@ import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -54,6 +69,9 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
*/
private final FlinkPipelineOptions options;
+ /** Custom transforms implementations. */
+ private final Map<Class<?>, Class<?>> overrides;
+
/**
* Construct a runner from the provided options.
*
@@ -93,6 +111,18 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
private FlinkRunner(FlinkPipelineOptions options) {
this.options = options;
+
+ ImmutableMap.Builder<Class<?>, Class<?>> builder = ImmutableMap.<Class<?>, Class<?>>builder();
+ if (options.isStreaming()) {
+ builder.put(Combine.GloballyAsSingletonView.class,
+ StreamingCombineGloballyAsSingletonView.class);
+ builder.put(View.AsMap.class, StreamingViewAsMap.class);
+ builder.put(View.AsMultimap.class, StreamingViewAsMultimap.class);
+ builder.put(View.AsSingleton.class, StreamingViewAsSingleton.class);
+ builder.put(View.AsList.class, StreamingViewAsList.class);
+ builder.put(View.AsIterable.class, StreamingViewAsIterable.class);
+ }
+ overrides = builder.build();
}
@Override
@@ -135,9 +165,27 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
}
@Override
- public <Output extends POutput, Input extends PInput> Output apply(
- PTransform<Input, Output> transform, Input input) {
- return super.apply(transform, input);
+ public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+ PTransform<InputT, OutputT> transform, InputT input) {
+ if (overrides.containsKey(transform.getClass())) {
+ // It is the responsibility of whoever constructs overrides to ensure this is type safe.
+ @SuppressWarnings("unchecked")
+ Class<PTransform<InputT, OutputT>> transformClass =
+ (Class<PTransform<InputT, OutputT>>) transform.getClass();
+
+ @SuppressWarnings("unchecked")
+ Class<PTransform<InputT, OutputT>> customTransformClass =
+ (Class<PTransform<InputT, OutputT>>) overrides.get(transform.getClass());
+
+ PTransform<InputT, OutputT> customTransform =
+ InstanceBuilder.ofType(customTransformClass)
+ .withArg(transformClass, transform)
+ .build();
+
+ return Pipeline.applyTransform(input, customTransform);
+ } else {
+ return super.apply(transform, input);
+ }
}
/////////////////////////////////////////////////////////////////////////////
@@ -154,9 +202,10 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
* @param classLoader The URLClassLoader to use to detect resources to stage.
* @return A list of absolute paths to the resources the class loader uses.
* @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one
- * of the resources the class loader exposes is not a file resource.
+ * of the resources the class loader exposes is not a file resource.
*/
- protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) {
+ protected static List<String> detectClassPathResourcesToStage(
+ ClassLoader classLoader) {
if (!(classLoader instanceof URLClassLoader)) {
String message = String.format("Unable to use ClassLoader to detect classpath elements. "
+ "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
@@ -176,4 +225,331 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
}
return files;
}
+
+ /**
+ * Specialized implementation for
+ * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
+ * for the Flink runner in streaming mode.
+ */
+ private static class StreamingViewAsMap<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public StreamingViewAsMap(View.AsMap<K, V> transform) {
+ }
+
+ @Override
+ public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) {
+ PCollectionView<Map<K, V>> view =
+ PCollectionViews.mapView(
+ input.getPipeline(),
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+ try {
+ inputCoder.getKeyCoder().verifyDeterministic();
+ } catch (Coder.NonDeterministicException e) {
+// runner.recordViewUsesNonDeterministicKeyCoder(this);
+ }
+
+ return input
+ .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+ .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, V>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsMap";
+ }
+ }
+
+ /**
+ * Specialized expansion for {@link
+ * org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the
+ * Flink runner in streaming mode.
+ */
+ private static class StreamingViewAsMultimap<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
+
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public StreamingViewAsMultimap(View.AsMultimap<K, V> transform) {
+ }
+
+ @Override
+ public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+ PCollectionView<Map<K, Iterable<V>>> view =
+ PCollectionViews.multimapView(
+ input.getPipeline(),
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+ try {
+ inputCoder.getKeyCoder().verifyDeterministic();
+ } catch (Coder.NonDeterministicException e) {
+// runner.recordViewUsesNonDeterministicKeyCoder(this);
+ }
+
+ return input
+ .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+ .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsMultimap";
+ }
+ }
+
+ /**
+ * Specialized implementation for
+ * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the
+ * Flink runner in streaming mode.
+ */
+ private static class StreamingViewAsList<T>
+ extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public StreamingViewAsList(View.AsList<T> transform) {}
+
+ @Override
+ public PCollectionView<List<T>> apply(PCollection<T> input) {
+ PCollectionView<List<T>> view =
+ PCollectionViews.listView(
+ input.getPipeline(),
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+ .apply(CreateFlinkPCollectionView.<T, List<T>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsList";
+ }
+ }
+
+ /**
+ * Specialized implementation for
+ * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} for the
+ * Flink runner in streaming mode.
+ */
+ private static class StreamingViewAsIterable<T>
+ extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public StreamingViewAsIterable(View.AsIterable<T> transform) { }
+
+ @Override
+ public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
+ PCollectionView<Iterable<T>> view =
+ PCollectionViews.iterableView(
+ input.getPipeline(),
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+ .apply(CreateFlinkPCollectionView.<T, Iterable<T>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsIterable";
+ }
+ }
+
+ private static class WrapAsList<T> extends OldDoFn<T, List<T>> {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(Arrays.asList(c.element()));
+ }
+ }
+
+ /**
+ * Specialized expansion for
+ * {@link org.apache.beam.sdk.transforms.View.AsSingleton View.AsSingleton} for the
+ * Flink runner in streaming mode.
+ */
+ private static class StreamingViewAsSingleton<T>
+ extends PTransform<PCollection<T>, PCollectionView<T>> {
+ private View.AsSingleton<T> transform;
+
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public StreamingViewAsSingleton(View.AsSingleton<T> transform) {
+ this.transform = transform;
+ }
+
+ @Override
+ public PCollectionView<T> apply(PCollection<T> input) {
+ Combine.Globally<T, T> combine = Combine.globally(
+ new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
+ if (!transform.hasDefaultValue()) {
+ combine = combine.withoutDefaults();
+ }
+ return input.apply(combine.asSingletonView());
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsSingleton";
+ }
+
+ private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
+ private boolean hasDefaultValue;
+ private T defaultValue;
+
+ SingletonCombine(boolean hasDefaultValue, T defaultValue) {
+ this.hasDefaultValue = hasDefaultValue;
+ this.defaultValue = defaultValue;
+ }
+
+ @Override
+ public T apply(T left, T right) {
+ throw new IllegalArgumentException("PCollection with more than one element "
+ + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
+ + "combine the PCollection into a single value");
+ }
+
+ @Override
+ public T identity() {
+ if (hasDefaultValue) {
+ return defaultValue;
+ } else {
+ throw new IllegalArgumentException(
+ "Empty PCollection accessed as a singleton view. "
+ + "Consider setting withDefault to provide a default value");
+ }
+ }
+ }
+ }
+
+ private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
+ extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
+ Combine.GloballyAsSingletonView<InputT, OutputT> transform;
+
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public StreamingCombineGloballyAsSingletonView(
+ Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
+ this.transform = transform;
+ }
+
+ @Override
+ public PCollectionView<OutputT> apply(PCollection<InputT> input) {
+ PCollection<OutputT> combined =
+ input.apply(Combine.globally(transform.getCombineFn())
+ .withoutDefaults()
+ .withFanout(transform.getFanout()));
+
+ PCollectionView<OutputT> view = PCollectionViews.singletonView(
+ combined.getPipeline(),
+ combined.getWindowingStrategy(),
+ transform.getInsertDefault(),
+ transform.getInsertDefault()
+ ? transform.getCombineFn().defaultValue() : null,
+ combined.getCoder());
+ return combined
+ .apply(ParDo.of(new WrapAsList<OutputT>()))
+ .apply(CreateFlinkPCollectionView.<OutputT, OutputT>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingCombineGloballyAsSingletonView";
+ }
+ }
+
+ /**
+ * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
+ *
+ * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
+ * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
+ * They require the input {@link PCollection} fits in memory.
+ * For a large {@link PCollection} this is expected to crash!
+ *
+ * @param <T> the type of elements to concatenate.
+ */
+ private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
+ @Override
+ public List<T> createAccumulator() {
+ return new ArrayList<T>();
+ }
+
+ @Override
+ public List<T> addInput(List<T> accumulator, T input) {
+ accumulator.add(input);
+ return accumulator;
+ }
+
+ @Override
+ public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+ List<T> result = createAccumulator();
+ for (List<T> accumulator : accumulators) {
+ result.addAll(accumulator);
+ }
+ return result;
+ }
+
+ @Override
+ public List<T> extractOutput(List<T> accumulator) {
+ return accumulator;
+ }
+
+ @Override
+ public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
+
+ @Override
+ public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
+ }
+
+ /**
+ * Creates a primitive {@link PCollectionView}.
+ *
+ * <p>For internal use only by runner implementors.
+ *
+ * @param <ElemT> The type of the elements of the input PCollection
+ * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
+ */
+ public static class CreateFlinkPCollectionView<ElemT, ViewT>
+ extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
+ private PCollectionView<ViewT> view;
+
+ private CreateFlinkPCollectionView(PCollectionView<ViewT> view) {
+ this.view = view;
+ }
+
+ public static <ElemT, ViewT> CreateFlinkPCollectionView<ElemT, ViewT> of(
+ PCollectionView<ViewT> view) {
+ return new CreateFlinkPCollectionView<>(view);
+ }
+
+ public PCollectionView<ViewT> getView() {
+ return view;
+ }
+
+ @Override
+ public PCollectionView<ViewT> apply(PCollection<List<ElemT>> input) {
+ return view;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
index 460933f..2a82749 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
@@ -59,7 +59,9 @@ public class TestFlinkRunner extends PipelineRunner<FlinkRunnerResult> {
@Override
public FlinkRunnerResult run(Pipeline pipeline) {
try {
- return delegate.run(pipeline);
+ FlinkRunnerResult result = delegate.run(pipeline);
+
+ return result;
} catch (RuntimeException e) {
// Special case hack to pull out assertion errors from PAssert; instead there should
// probably be a better story along the lines of UserCodeException.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
index 2e655a3..3bb8c59 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
@@ -28,10 +28,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This is a {@link FlinkPipelineTranslator} for streaming jobs. Its role is to translate the user-provided
- * {@link org.apache.beam.sdk.values.PCollection}-based job into a
+ * This is a {@link FlinkPipelineTranslator} for streaming jobs. Its role is to translate
+ * the user-provided {@link org.apache.beam.sdk.values.PCollection}-based job into a
* {@link org.apache.flink.streaming.api.datastream.DataStream} one.
- * */
+ *
+ */
public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
private static final Logger LOG = LoggerFactory.getLogger(FlinkStreamingPipelineTranslator.class);
@@ -55,8 +56,10 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
PTransform<?, ?> transform = node.getTransform();
if (transform != null) {
- StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
- if (translator != null) {
+ StreamTransformTranslator<?> translator =
+ FlinkStreamingTransformTranslators.getTranslator(transform);
+
+ if (translator != null && applyCanTranslate(transform, node, translator)) {
applyStreamingTransform(transform, node, translator);
LOG.info(genSpaces(this.depth) + "translated-" + formatNodeName(node));
return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
@@ -79,10 +82,13 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
// currently visiting and translate it into its Flink alternative.
PTransform<?, ?> transform = node.getTransform();
- StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
- if (translator == null) {
+ StreamTransformTranslator<?> translator =
+ FlinkStreamingTransformTranslators.getTranslator(transform);
+
+ if (translator == null && applyCanTranslate(transform, node, translator)) {
LOG.info(node.getTransform().getClass().toString());
- throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
+ throw new UnsupportedOperationException(
+ "The transform " + transform + " is currently not supported.");
}
applyStreamingTransform(transform, node, translator);
}
@@ -92,7 +98,10 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
// do nothing here
}
- private <T extends PTransform<?, ?>> void applyStreamingTransform(PTransform<?, ?> transform, TransformTreeNode node, StreamTransformTranslator<?> translator) {
+ private <T extends PTransform<?, ?>> void applyStreamingTransform(
+ PTransform<?, ?> transform,
+ TransformTreeNode node,
+ StreamTransformTranslator<?> translator) {
@SuppressWarnings("unchecked")
T typedTransform = (T) transform;
@@ -106,13 +115,41 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
typedTranslator.translateNode(typedTransform, streamingContext);
}
+ private <T extends PTransform<?, ?>> boolean applyCanTranslate(
+ PTransform<?, ?> transform,
+ TransformTreeNode node,
+ StreamTransformTranslator<?> translator) {
+
+ @SuppressWarnings("unchecked")
+ T typedTransform = (T) transform;
+
+ @SuppressWarnings("unchecked")
+ StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator;
+
+ streamingContext.setCurrentTransform(AppliedPTransform.of(
+ node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform));
+
+ return typedTranslator.canTranslate(typedTransform, streamingContext);
+ }
+
/**
* The interface that every Flink translator of a Beam operator should implement.
* This interface is for <b>streaming</b> jobs. For examples of such translators see
* {@link FlinkStreamingTransformTranslators}.
*/
- public interface StreamTransformTranslator<Type extends PTransform> {
- void translateNode(Type transform, FlinkStreamingTranslationContext context);
+ abstract static class StreamTransformTranslator<T extends PTransform> {
+
+ /**
+ * Translate the given transform.
+ */
+ abstract void translateNode(T transform, FlinkStreamingTranslationContext context);
+
+ /**
+ * Returns true iff this translator can translate the given transform.
+ */
+ boolean canTranslate(T transform, FlinkStreamingTranslationContext context) {
+ return true;
+ }
}
private static String formatNodeName(TransformTreeNode node) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 8167623..6c2c703 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -18,31 +18,29 @@
package org.apache.beam.runners.flink.translation;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
+import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.types.FlinkCoder;
-import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.FlinkStreamingCreateFunction;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.Sink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
@@ -53,7 +51,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.AppliedCombineFn;
-import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.Reshuffle;
import org.apache.beam.sdk.util.SystemReduceFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -68,24 +66,27 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.util.Collector;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -112,19 +113,22 @@ public class FlinkStreamingTransformTranslators {
// here you can find all the available translators.
static {
- TRANSLATORS.put(Create.Values.class, new CreateStreamingTranslator());
TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator());
TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
- TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
+ TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator());
TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
- TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator());
+ TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
+ TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator());
TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator());
+ TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator());
+ TRANSLATORS.put(
+ FlinkRunner.CreateFlinkPCollectionView.class, new CreateViewStreamingTranslator());
+
+ TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorStreaming());
TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
- TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator());
- TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator());
}
public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator(
@@ -136,54 +140,8 @@ public class FlinkStreamingTransformTranslators {
// Transformation Implementations
// --------------------------------------------------------------------------------------------
- private static class CreateStreamingTranslator<OutputT> implements
- FlinkStreamingPipelineTranslator.StreamTransformTranslator<Create.Values<OutputT>> {
-
- @Override
- public void translateNode(
- Create.Values<OutputT> transform,
- FlinkStreamingTranslationContext context) {
-
- PCollection<OutputT> output = context.getOutput(transform);
- Iterable<OutputT> elements = transform.getElements();
-
- // we need to serialize the elements to byte arrays, since they might contain
- // elements that are not serializable by Java serialization. We deserialize them
- // in the FlatMap function using the Coder.
-
- List<byte[]> serializedElements = Lists.newArrayList();
- Coder<OutputT> elementCoder = output.getCoder();
- for (OutputT element: elements) {
- ByteArrayOutputStream bao = new ByteArrayOutputStream();
- try {
- elementCoder.encode(element, bao, Coder.Context.OUTER);
- serializedElements.add(bao.toByteArray());
- } catch (IOException e) {
- throw new RuntimeException("Could not serialize Create elements using Coder: " + e);
- }
- }
-
-
- DataStream<Integer> initDataSet = context.getExecutionEnvironment().fromElements(1);
-
- FlinkStreamingCreateFunction<Integer, OutputT> createFunction =
- new FlinkStreamingCreateFunction<>(serializedElements, elementCoder);
-
- WindowedValue.ValueOnlyWindowedValueCoder<OutputT> windowCoder =
- WindowedValue.getValueOnlyCoder(elementCoder);
-
- TypeInformation<WindowedValue<OutputT>> outputType = new CoderTypeInformation<>(windowCoder);
-
- DataStream<WindowedValue<OutputT>> outputDataStream = initDataSet
- .flatMap(createFunction).returns(outputType);
-
- context.setOutputDataStream(output, outputDataStream);
- }
- }
-
-
private static class TextIOWriteBoundStreamingTranslator<T>
- implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
TextIO.Write.Bound<T>> {
private static final Logger LOG =
@@ -230,7 +188,7 @@ public class FlinkStreamingTransformTranslators {
}
private static class WriteSinkStreamingTranslator<T>
- implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> {
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> {
@Override
public void translateNode(Write.Bound<T> transform, FlinkStreamingTranslationContext context) {
@@ -254,29 +212,8 @@ public class FlinkStreamingTransformTranslators {
}
}
- private static class BoundedReadSourceTranslator<T>
- implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>> {
-
- @Override
- public void translateNode(Read.Bounded<T> transform, FlinkStreamingTranslationContext context) {
-
- BoundedSource<T> boundedSource = transform.getSource();
- PCollection<T> output = context.getOutput(transform);
-
- TypeInformation<WindowedValue<T>> typeInfo = context.getTypeInfo(output);
-
- DataStream<WindowedValue<T>> source = context.getExecutionEnvironment().createInput(
- new SourceInputFormat<>(
- boundedSource,
- context.getPipelineOptions()),
- typeInfo);
-
- context.setOutputDataStream(output, source);
- }
- }
-
private static class UnboundedReadSourceTranslator<T>
- implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
@Override
public void translateNode(
@@ -290,7 +227,8 @@ public class FlinkStreamingTransformTranslators {
UnboundedFlinkSource<T> flinkSourceFunction =
(UnboundedFlinkSource<T>) transform.getSource();
- final AssignerWithPeriodicWatermarks<T> flinkAssigner = flinkSourceFunction.getFlinkTimestampAssigner();
+ final AssignerWithPeriodicWatermarks<T> flinkAssigner =
+ flinkSourceFunction.getFlinkTimestampAssigner();
DataStream<T> flinkSource = context.getExecutionEnvironment()
.addSource(flinkSourceFunction.getFlinkSource());
@@ -332,8 +270,37 @@ public class FlinkStreamingTransformTranslators {
}
}
+ private static class BoundedReadSourceTranslator<T>
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>> {
+
+ @Override
+ public void translateNode(
+ Read.Bounded<T> transform,
+ FlinkStreamingTranslationContext context) {
+ PCollection<T> output = context.getOutput(transform);
+
+ DataStream<WindowedValue<T>> source;
+ try {
+ transform.getSource();
+ BoundedSourceWrapper<T> sourceWrapper =
+ new BoundedSourceWrapper<>(
+ context.getPipelineOptions(),
+ transform.getSource(),
+ context.getExecutionEnvironment().getParallelism());
+ source = context
+ .getExecutionEnvironment()
+ .addSource(sourceWrapper).name(transform.getName());
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Error while translating BoundedSource: " + transform.getSource(), e);
+ }
+
+ context.setOutputDataStream(output, source);
+ }
+ }
+
private static class ParDoBoundStreamingTranslator<InputT, OutputT>
- implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
ParDo.Bound<InputT, OutputT>> {
@Override
@@ -347,27 +314,292 @@ public class FlinkStreamingTransformTranslators {
TypeInformation<WindowedValue<OutputT>> typeInfo =
context.getTypeInfo(context.getOutput(transform));
- DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator = new DoFnOperator<>(
- transform.getFn(),
- new TupleTag<OutputT>("main output"),
- Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<OutputT>>(),
- windowingStrategy,
- new HashMap<PCollectionView<?>, WindowingStrategy<?, ?>>(),
- context.getPipelineOptions());
+ List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+
+ @SuppressWarnings("unchecked")
+ PCollection<InputT> inputPCollection = (PCollection<InputT>) context.getInput(transform);
+
+ TypeInformation<WindowedValue<InputT>> inputTypeInfo =
+ context.getTypeInfo(inputPCollection);
+
+ if (sideInputs.isEmpty()) {
+ DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator =
+ new DoFnOperator<>(
+ transform.getFn(),
+ inputTypeInfo,
+ new TupleTag<OutputT>("main output"),
+ Collections.<TupleTag<?>>emptyList(),
+ new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<OutputT>>(),
+ windowingStrategy,
+ new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+ Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+ context.getPipelineOptions());
+
+ DataStream<WindowedValue<InputT>> inputDataStream =
+ context.getInputDataStream(context.getInput(transform));
+
+ SingleOutputStreamOperator<WindowedValue<OutputT>> outDataStream = inputDataStream
+ .transform(transform.getName(), typeInfo, doFnOperator);
+
+ context.setOutputDataStream(context.getOutput(transform), outDataStream);
+ } else {
+ Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformedSideInputs =
+ transformSideInputs(sideInputs, context);
+
+ DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator =
+ new DoFnOperator<>(
+ transform.getFn(),
+ inputTypeInfo,
+ new TupleTag<OutputT>("main output"),
+ Collections.<TupleTag<?>>emptyList(),
+ new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<OutputT>>(),
+ windowingStrategy,
+ transformedSideInputs.f0,
+ sideInputs,
+ context.getPipelineOptions());
+
+ DataStream<WindowedValue<InputT>> inputDataStream =
+ context.getInputDataStream(context.getInput(transform));
+
+ SingleOutputStreamOperator<WindowedValue<OutputT>> outDataStream = inputDataStream
+ .connect(transformedSideInputs.f1.broadcast())
+ .transform(transform.getName(), typeInfo, doFnOperator);
+
+ context.setOutputDataStream(context.getOutput(transform), outDataStream);
+
+ }
+ }
+ }
+
+ /**
+ * Wraps each element in a {@link RawUnionValue} with the given tag id.
+ */
+ private static class ToRawUnion<T> implements MapFunction<T, RawUnionValue> {
+ private final int intTag;
+
+ public ToRawUnion(int intTag) {
+ this.intTag = intTag;
+ }
+
+ @Override
+ public RawUnionValue map(T o) throws Exception {
+ return new RawUnionValue(intTag, o);
+ }
+ }
+
+ private static Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>>
+ transformSideInputs(
+ Collection<PCollectionView<?>> sideInputs,
+ FlinkStreamingTranslationContext context) {
+
+ // collect all side inputs
+ Map<TupleTag<?>, Integer> tagToIntMapping = new HashMap<>();
+ Map<Integer, PCollectionView<?>> intToViewMapping = new HashMap<>();
+ int count = 0;
+ for (PCollectionView<?> sideInput: sideInputs) {
+ TupleTag<?> tag = sideInput.getTagInternal();
+ intToViewMapping.put(count, sideInput);
+ tagToIntMapping.put(tag, count);
+ count++;
+ Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal();
+ }
+
+
+ List<Coder<?>> inputCoders = new ArrayList<>();
+ for (PCollectionView<?> sideInput: sideInputs) {
+ DataStream<Object> sideInputStream = context.getInputDataStream(sideInput);
+ TypeInformation<Object> tpe = sideInputStream.getType();
+ if (!(tpe instanceof CoderTypeInformation)) {
+ throw new IllegalStateException(
+ "Input Stream TypeInformation is no CoderTypeInformation.");
+ }
+
+ Coder<?> coder = ((CoderTypeInformation) tpe).getCoder();
+ inputCoders.add(coder);
+ }
+
+ UnionCoder unionCoder = UnionCoder.of(inputCoders);
+
+ CoderTypeInformation<RawUnionValue> unionTypeInformation =
+ new CoderTypeInformation<>(unionCoder);
+
+ // transform each side input to RawUnionValue and union them
+ DataStream<RawUnionValue> sideInputUnion = null;
+
+ for (PCollectionView<?> sideInput: sideInputs) {
+ TupleTag<?> tag = sideInput.getTagInternal();
+ final int intTag = tagToIntMapping.get(tag);
+ DataStream<Object> sideInputStream = context.getInputDataStream(sideInput);
+ DataStream<RawUnionValue> unionValueStream =
+ sideInputStream.map(new ToRawUnion<>(intTag)).returns(unionTypeInformation);
+
+ if (sideInputUnion == null) {
+ sideInputUnion = unionValueStream;
+ } else {
+ sideInputUnion = sideInputUnion.union(unionValueStream);
+ }
+ }
+
+ if (sideInputUnion == null) {
+ throw new IllegalStateException("No unioned side inputs, this indicates a bug.");
+ }
+
+ return new Tuple2<>(intToViewMapping, sideInputUnion);
+ }
+
+
+ private static class ParDoBoundMultiStreamingTranslator<InputT, OutputT>
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+ ParDo.BoundMulti<InputT, OutputT>> {
+
+ @Override
+ public void translateNode(
+ ParDo.BoundMulti<InputT, OutputT> transform,
+ FlinkStreamingTranslationContext context) {
+
+ // we assume that the transformation does not change the windowing strategy.
+ WindowingStrategy<?, ?> windowingStrategy =
+ context.getInput(transform).getWindowingStrategy();
+
+ Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
+
+ Map<TupleTag<?>, Integer> tagsToLabels =
+ transformTupleTagsToLabels(transform.getMainOutputTag(), outputs.keySet());
+
+ List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+
+ SingleOutputStreamOperator<RawUnionValue> unionOutputStream;
+
+ @SuppressWarnings("unchecked")
+ PCollection<InputT> inputPCollection = (PCollection<InputT>) context.getInput(transform);
+
+ TypeInformation<WindowedValue<InputT>> inputTypeInfo =
+ context.getTypeInfo(inputPCollection);
+
+ if (sideInputs.isEmpty()) {
+ DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
+ new DoFnOperator<>(
+ transform.getFn(),
+ inputTypeInfo,
+ transform.getMainOutputTag(),
+ transform.getSideOutputTags().getAll(),
+ new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
+ windowingStrategy,
+ new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+ Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+ context.getPipelineOptions());
+
+ UnionCoder outputUnionCoder = createUnionCoder(outputs.values());
+
+ CoderTypeInformation<RawUnionValue> outputUnionTypeInformation =
+ new CoderTypeInformation<>(outputUnionCoder);
+
+ DataStream<WindowedValue<InputT>> inputDataStream =
+ context.getInputDataStream(context.getInput(transform));
+
+ unionOutputStream = inputDataStream
+ .transform(transform.getName(), outputUnionTypeInformation, doFnOperator);
+
+ } else {
+ Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformedSideInputs =
+ transformSideInputs(sideInputs, context);
+
+ DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
+ new DoFnOperator<>(
+ transform.getFn(),
+ inputTypeInfo,
+ transform.getMainOutputTag(),
+ transform.getSideOutputTags().getAll(),
+ new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
+ windowingStrategy,
+ transformedSideInputs.f0,
+ sideInputs,
+ context.getPipelineOptions());
+
+ UnionCoder outputUnionCoder = createUnionCoder(outputs.values());
+
+ CoderTypeInformation<RawUnionValue> outputUnionTypeInformation =
+ new CoderTypeInformation<>(outputUnionCoder);
+
+ DataStream<WindowedValue<InputT>> inputDataStream =
+ context.getInputDataStream(context.getInput(transform));
+
+ unionOutputStream = inputDataStream
+ .connect(transformedSideInputs.f1.broadcast())
+ .transform(transform.getName(), outputUnionTypeInformation, doFnOperator);
+ }
+
+ for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) {
+ final int outputTag = tagsToLabels.get(output.getKey());
+
+ TypeInformation outputTypeInfo =
+ context.getTypeInfo(output.getValue());
+
+ @SuppressWarnings("unchecked")
+ DataStream filtered =
+ unionOutputStream.flatMap(new FlatMapFunction<RawUnionValue, Object>() {
+ @Override
+ public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception {
+ System.out.println("FILTERING: " + value);
+ if (value.getUnionTag() == outputTag) {
+ System.out.println("EMITTING VALUE: " + value);
+ out.collect(value.getValue());
+ }
+ }
+ }).returns(outputTypeInfo);
- DataStream<WindowedValue<InputT>> inputDataStream =
+ context.setOutputDataStream(output.getValue(), filtered);
+ }
+ }
+
+ private Map<TupleTag<?>, Integer> transformTupleTagsToLabels(
+ TupleTag<?> mainTag,
+ Set<TupleTag<?>> secondaryTags) {
+
+ Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap();
+ int count = 0;
+ tagToLabelMap.put(mainTag, count++);
+ for (TupleTag<?> tag : secondaryTags) {
+ if (!tagToLabelMap.containsKey(tag)) {
+ tagToLabelMap.put(tag, count++);
+ }
+ }
+ return tagToLabelMap;
+ }
+
+ private UnionCoder createUnionCoder(Collection<PCollection<?>> taggedCollections) {
+ List<Coder<?>> outputCoders = Lists.newArrayList();
+ for (PCollection<?> coll : taggedCollections) {
+ WindowedValue.FullWindowedValueCoder<?> windowedValueCoder =
+ WindowedValue.getFullCoder(
+ coll.getCoder(),
+ coll.getWindowingStrategy().getWindowFn().windowCoder());
+ outputCoders.add(windowedValueCoder);
+ }
+ return UnionCoder.of(outputCoders);
+ }
+ }
+
+ private static class CreateViewStreamingTranslator<ElemT, ViewT>
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+ FlinkRunner.CreateFlinkPCollectionView<ElemT, ViewT>> {
+
+ @Override
+ public void translateNode(
+ FlinkRunner.CreateFlinkPCollectionView<ElemT, ViewT> transform,
+ FlinkStreamingTranslationContext context) {
+ // just forward
+ DataStream<WindowedValue<List<ElemT>>> inputDataSet =
context.getInputDataStream(context.getInput(transform));
- SingleOutputStreamOperator<WindowedValue<OutputT>> outDataStream =
- inputDataStream.transform(transform.getName(), typeInfo, doFnOperator);
+ PCollectionView<ViewT> input = transform.getView();
- context.setOutputDataStream(context.getOutput(transform), outDataStream);
+ context.setOutputDataStream(input, inputDataSet);
}
}
private static class WindowBoundTranslator<T>
- implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Bound<T>> {
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Bound<T>> {
@Override
public void translateNode(
@@ -384,17 +616,26 @@ public class FlinkStreamingTransformTranslators {
OldDoFn<T, T> windowAssignerDoFn =
createWindowAssigner(windowingStrategy.getWindowFn());
+ @SuppressWarnings("unchecked")
+ PCollection<T> inputPCollection = context.getInput(transform);
+
+ TypeInformation<WindowedValue<T>> inputTypeInfo =
+ context.getTypeInfo(inputPCollection);
+
DoFnOperator<T, T, WindowedValue<T>> doFnOperator = new DoFnOperator<>(
windowAssignerDoFn,
+ inputTypeInfo,
new TupleTag<T>("main output"),
Collections.<TupleTag<?>>emptyList(),
new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<T>>(),
windowingStrategy,
- new HashMap<PCollectionView<?>, WindowingStrategy<?, ?>>(),
+ new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+ Collections.<PCollectionView<?>>emptyList(), /* side inputs */
context.getPipelineOptions());
DataStream<WindowedValue<T>> inputDataStream =
context.getInputDataStream(context.getInput(transform));
+
SingleOutputStreamOperator<WindowedValue<T>> outDataStream = inputDataStream
.transform(transform.getName(), typeInfo, doFnOperator);
@@ -433,8 +674,25 @@ public class FlinkStreamingTransformTranslators {
}
}
+ private static class ReshuffleTranslatorStreaming<K, InputT>
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Reshuffle<K, InputT>> {
+
+ @Override
+ public void translateNode(
+ Reshuffle<K, InputT> transform,
+ FlinkStreamingTranslationContext context) {
+
+ DataStream<WindowedValue<KV<K, InputT>>> inputDataSet =
+ context.getInputDataStream(context.getInput(transform));
+
+ context.setOutputDataStream(context.getOutput(transform), inputDataSet.rebalance());
+
+ }
+ }
+
+
private static class GroupByKeyTranslator<K, InputT>
- implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, InputT>> {
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, InputT>> {
@Override
public void translateNode(
@@ -456,10 +714,11 @@ public class FlinkStreamingTransformTranslators {
DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input);
-
- WindowedValue.ValueOnlyWindowedValueCoder<
- SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder =
- WindowedValue.getValueOnlyCoder(workItemCoder);
+ WindowedValue.
+ FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder =
+ WindowedValue.getFullCoder(
+ workItemCoder,
+ input.getWindowingStrategy().getWindowFn().windowCoder());
CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo =
new CoderTypeInformation<>(windowedWorkItemCoder);
@@ -484,17 +743,16 @@ public class FlinkStreamingTransformTranslators {
WindowedValue<KV<K, Iterable<InputT>>>> outputManagerFactory =
new DoFnOperator.DefaultOutputManagerFactory<>();
- WindowDoFnOperator<
- K,
- InputT,
- KV<K, Iterable<InputT>>> doFnOperator =
+ WindowDoFnOperator<K, InputT, Iterable<InputT>> doFnOperator =
new WindowDoFnOperator<>(
reduceFn,
+ (TypeInformation) workItemTypeInfo,
new TupleTag<KV<K, Iterable<InputT>>>("main output"),
Collections.<TupleTag<?>>emptyList(),
outputManagerFactory,
windowingStrategy,
- new HashMap<PCollectionView<?>, WindowingStrategy<?, ?>>(),
+ new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+ Collections.<PCollectionView<?>>emptyList(), /* side inputs */
context.getPipelineOptions(),
inputKvCoder.getKeyCoder());
@@ -514,8 +772,25 @@ public class FlinkStreamingTransformTranslators {
}
private static class CombinePerKeyTranslator<K, InputT, OutputT>
- implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<
- Combine.PerKey<K, InputT, OutputT>> {
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+ Combine.PerKey<K, InputT, OutputT>> {
+
+ @Override
+ boolean canTranslate(
+ Combine.PerKey<K, InputT, OutputT> transform,
+ FlinkStreamingTranslationContext context) {
+
+ // if we have a merging window strategy and side inputs we cannot
+ // translate as a proper combine. We have to group and then run the combine
+ // over the final grouped values.
+ PCollection<KV<K, InputT>> input = context.getInput(transform);
+
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<?, BoundedWindow> windowingStrategy =
+ (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
+
+ return windowingStrategy.getWindowFn().isNonMerging() || transform.getSideInputs().isEmpty();
+ }
@Override
public void translateNode(
@@ -537,10 +812,11 @@ public class FlinkStreamingTransformTranslators {
DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input);
-
- WindowedValue.ValueOnlyWindowedValueCoder<
- SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder =
- WindowedValue.getValueOnlyCoder(workItemCoder);
+ WindowedValue.
+ FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder =
+ WindowedValue.getFullCoder(
+ workItemCoder,
+ input.getWindowingStrategy().getWindowFn().windowCoder());
CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo =
new CoderTypeInformation<>(windowedWorkItemCoder);
@@ -560,32 +836,78 @@ public class FlinkStreamingTransformTranslators {
AppliedCombineFn.withInputCoder(
transform.getFn(), input.getPipeline().getCoderRegistry(), inputKvCoder));
-
- OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> windowDoFn =
- GroupAlsoByWindowViaWindowSetDoFn.create(windowingStrategy, reduceFn);
-
-
TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo =
context.getTypeInfo(context.getOutput(transform));
- WindowDoFnOperator<K, InputT, KV<K, OutputT>, WindowedValue<KV<K, OutputT>>> doFnOperator =
- new WindowDoFnOperator<>(
- windowDoFn,
- new TupleTag<KV<K, OutputT>>("main output"),
- Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(),
- windowingStrategy,
- new HashMap<PCollectionView<?>, WindowingStrategy<?, ?>>(),
- context.getPipelineOptions(),
- inputKvCoder.getKeyCoder());
-
- // our operator excepts WindowedValue<KeyedWorkItem> while our input stream
- // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ...
- @SuppressWarnings("unchecked")
- SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream = keyedWorkItemStream
- .transform(transform.getName(), outputTypeInfo, (OneInputStreamOperator) doFnOperator);
+ List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+
+ if (sideInputs.isEmpty()) {
+
+ WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
+ new WindowDoFnOperator<>(
+ reduceFn,
+ (TypeInformation) workItemTypeInfo,
+ new TupleTag<KV<K, OutputT>>("main output"),
+ Collections.<TupleTag<?>>emptyList(),
+ new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(),
+ windowingStrategy,
+ new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+ Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+ context.getPipelineOptions(),
+ inputKvCoder.getKeyCoder());
+
+ // our operator excepts WindowedValue<KeyedWorkItem> while our input stream
+ // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ...
+ @SuppressWarnings("unchecked")
+ SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream =
+ keyedWorkItemStream.transform(
+ transform.getName(), outputTypeInfo, (OneInputStreamOperator) doFnOperator);
- context.setOutputDataStream(context.getOutput(transform), outDataStream);
+ context.setOutputDataStream(context.getOutput(transform), outDataStream);
+ } else {
+ Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformSideInputs =
+ transformSideInputs(sideInputs, context);
+
+ WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
+ new WindowDoFnOperator<>(
+ reduceFn,
+ (TypeInformation) workItemTypeInfo,
+ new TupleTag<KV<K, OutputT>>("main output"),
+ Collections.<TupleTag<?>>emptyList(),
+ new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(),
+ windowingStrategy,
+ transformSideInputs.f0,
+ sideInputs,
+ context.getPipelineOptions(),
+ inputKvCoder.getKeyCoder());
+
+ // we have to manually contruct the two-input transform because we're not
+ // allowed to have only one input keyed, normally.
+
+ TwoInputTransformation<
+ WindowedValue<SingletonKeyedWorkItem<K, InputT>>,
+ RawUnionValue,
+ WindowedValue<KV<K, OutputT>>> rawFlinkTransform = new TwoInputTransformation<>(
+ keyedWorkItemStream.getTransformation(),
+ transformSideInputs.f1.broadcast().getTransformation(),
+ transform.getName(),
+ (TwoInputStreamOperator) doFnOperator,
+ outputTypeInfo,
+ keyedWorkItemStream.getParallelism());
+
+ rawFlinkTransform.setStateKeyType(keyedWorkItemStream.getKeyType());
+ rawFlinkTransform.setStateKeySelectors(keyedWorkItemStream.getKeySelector(), null);
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream =
+ new SingleOutputStreamOperator(
+ keyedWorkItemStream.getExecutionEnvironment(),
+ rawFlinkTransform) {}; // we have to cheat around the ctor being protected
+
+ keyedWorkItemStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
+
+ context.setOutputDataStream(context.getOutput(transform), outDataStream);
+ }
}
private static class ToKeyedWorkItem<K, InputT>
@@ -595,21 +917,27 @@ public class FlinkStreamingTransformTranslators {
@Override
public void flatMap(
- WindowedValue<KV<K, InputT>> in,
+ WindowedValue<KV<K, InputT>> inWithMultipleWindows,
Collector<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> out) throws Exception {
- SingletonKeyedWorkItem<K, InputT> workItem =
- new SingletonKeyedWorkItem<>(
- in.getValue().getKey(),
- in.withValue(in.getValue().getValue()));
-
- out.collect(WindowedValue.valueInEmptyWindows(workItem));
+ // we need to wrap each one work item per window for now
+ // since otherwise the PushbackSideInputRunner will not correctly
+ // determine whether side inputs are ready
+ for (WindowedValue<KV<K, InputT>> in : inWithMultipleWindows.explodeWindows()) {
+ SingletonKeyedWorkItem<K, InputT> workItem =
+ new SingletonKeyedWorkItem<>(
+ in.getValue().getKey(),
+ in.withValue(in.getValue().getValue()));
+
+ in.withValue(workItem);
+ out.collect(in.withValue(workItem));
+ }
}
}
}
private static class FlattenPCollectionTranslator<T>
- implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
Flatten.FlattenPCollectionList<T>> {
@Override
@@ -617,91 +945,38 @@ public class FlinkStreamingTransformTranslators {
Flatten.FlattenPCollectionList<T> transform,
FlinkStreamingTranslationContext context) {
List<PCollection<T>> allInputs = context.getInput(transform).getAll();
- DataStream<T> result = null;
- for (PCollection<T> collection : allInputs) {
- DataStream<T> current = context.getInputDataStream(collection);
- result = (result == null) ? current : result.union(current);
- }
- context.setOutputDataStream(context.getOutput(transform), result);
- }
- }
-
- private static class ParDoBoundMultiStreamingTranslator<InputT, OutputT>
- implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<
- ParDo.BoundMulti<InputT, OutputT>> {
-
- @Override
- public void translateNode(
- ParDo.BoundMulti<InputT, OutputT> transform,
- FlinkStreamingTranslationContext context) {
-
- // we assume that the transformation does not change the windowing strategy.
- WindowingStrategy<?, ?> windowingStrategy =
- context.getInput(transform).getWindowingStrategy();
-
- Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
-
- Map<TupleTag<?>, Integer> tagsToLabels =
- transformTupleTagsToLabels(transform.getMainOutputTag(), outputs.keySet());
-
- DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator = new DoFnOperator<>(
- transform.getFn(),
- transform.getMainOutputTag(),
- transform.getSideOutputTags().getAll(),
- new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
- windowingStrategy,
- new HashMap<PCollectionView<?>, WindowingStrategy<?, ?>>(),
- context.getPipelineOptions());
-
- UnionCoder unionCoder = createUnionCoder(outputs.values());
- CoderTypeInformation<RawUnionValue> unionTypeInformation =
- new CoderTypeInformation<>(unionCoder);
+ if (allInputs.isEmpty()) {
- DataStream<WindowedValue<InputT>> inputDataStream =
- context.getInputDataStream(context.getInput(transform));
+ // create an empty dummy source to satisfy downstream operations
+ // we cannot create an empty source in Flink, therefore we have to
+ // add the flatMap that simply never forwards the single element
+ DataStreamSource<String> dummySource =
+ context.getExecutionEnvironment().fromElements("dummy");
- SingleOutputStreamOperator<RawUnionValue> unionStream = inputDataStream
- .transform(transform.getName(), unionTypeInformation, doFnOperator);
-
- for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) {
- final int outputTag = tagsToLabels.get(output.getKey());
-
- TypeInformation outputTypeInfo =
- context.getTypeInfo(output.getValue());
-
- unionStream.flatMap(new FlatMapFunction<RawUnionValue, Object>() {
- @Override
- public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception {
- if (value.getUnionTag() == outputTag) {
- out.collect(value.getValue());
- }
- }
- }).returns(outputTypeInfo);
- }
- }
-
- private Map<TupleTag<?>, Integer> transformTupleTagsToLabels(
- TupleTag<?> mainTag,
- Set<TupleTag<?>> secondaryTags) {
+ DataStream<WindowedValue<T>> result = dummySource.flatMap(
+ new FlatMapFunction<String, WindowedValue<T>>() {
+ @Override
+ public void flatMap(
+ String s,
+ Collector<WindowedValue<T>> collector) throws Exception {
+ // never return anything
+ }
+ }).returns(
+ new CoderTypeInformation<>(
+ WindowedValue.getFullCoder(
+ (Coder<T>) VoidCoder.of(),
+ GlobalWindow.Coder.INSTANCE)));
+ context.setOutputDataStream(context.getOutput(transform), result);
- Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap();
- int count = 0;
- tagToLabelMap.put(mainTag, count++);
- for (TupleTag<?> tag : secondaryTags) {
- if (!tagToLabelMap.containsKey(tag)) {
- tagToLabelMap.put(tag, count++);
+ } else {
+ DataStream<T> result = null;
+ for (PCollection<T> collection : allInputs) {
+ DataStream<T> current = context.getInputDataStream(collection);
+ result = (result == null) ? current : result.union(current);
}
+ context.setOutputDataStream(context.getOutput(transform), result);
}
- return tagToLabelMap;
- }
-
- private UnionCoder createUnionCoder(Collection<PCollection<?>> taggedCollections) {
- List<Coder<?>> outputCoders = Lists.newArrayList();
- for (PCollection<?> coll : taggedCollections) {
- outputCoders.add(coll.getCoder());
- }
- return UnionCoder.of(outputCoders);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
index 71cc6b7..61abf9a 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
@@ -40,6 +40,10 @@ public class CoderTypeInformation<T> extends TypeInformation<T> implements Atomi
this.coder = coder;
}
+ public Coder<T> getCoder() {
+ return coder;
+ }
+
@Override
public boolean isBasicType() {
return false;