You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/24 19:52:49 UTC

[11/17] incubator-beam git commit: [BEAM-102] Add Side Inputs in Flink Streaming Runner

[BEAM-102] Add Side Inputs in Flink Streaming Runner

This adds a generic SideInputHandler in runners-core that is only used
by the Flink runner right now but can be used by other runner
implementations.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dfbdc6c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dfbdc6c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dfbdc6c2

Branch: refs/heads/master
Commit: dfbdc6c2bbef5e749bfc1800f97d21377f0c713d
Parents: ff34f9e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Jul 11 14:08:35 2016 +0200
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Aug 24 12:46:24 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/core/SideInputHandler.java     | 240 ++++++
 .../beam/runners/core/SideInputHandlerTest.java | 222 ++++++
 .../apache/beam/runners/flink/FlinkRunner.java  | 386 +++++++++-
 .../beam/runners/flink/TestFlinkRunner.java     |   4 +-
 .../FlinkStreamingPipelineTranslator.java       |  59 +-
 .../FlinkStreamingTransformTranslators.java     | 727 +++++++++++++------
 .../translation/types/CoderTypeInformation.java |   4 +
 .../wrappers/streaming/DoFnOperator.java        | 282 ++++++-
 .../wrappers/streaming/WindowDoFnOperator.java  |  47 +-
 .../streaming/io/BoundedSourceWrapper.java      | 219 ++++++
 .../io/FlinkStreamingCreateFunction.java        |  56 --
 .../flink/streaming/DoFnOperatorTest.java       | 328 +++++++++
 .../streaming/UnboundedSourceWrapperTest.java   |   2 +-
 .../beam/sdk/transforms/join/RawUnionValue.java |  25 +
 14 files changed, 2270 insertions(+), 331 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
new file mode 100644
index 0000000..6550251
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SetCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+/**
+ * Generic side input handler that uses {@link StateInternals} to store all data. Both the actual
+ * side-input data and data about the windows for which we have side inputs available are stored
+ * using {@code StateInternals}.
+ *
+ * <p>The given {@code StateInternals} must not be scoped to an element key. The state
+ * must instead be scoped to one key group for which the side input is being managed.
+ *
+ * <p>This is useful for runners that transmit the side-input elements in band, as opposed
+ * to how Dataflow has an external service for managing side inputs.
+ *
+ * <p>Note: storing the available windows in an extra state is redundant for now but in the
+ * future we might want to know which windows we have available so that we can garbage collect
+ * side input data. For now, this will never clean up side-input data because we have no way
+ * of knowing when we reach the GC horizon.
+ */
+public class SideInputHandler implements ReadyCheckingSideInputReader {
+
+  /** The list of side inputs that we're handling. */
+  protected final Collection<PCollectionView<?>> sideInputs;
+
+  /** State internals that are scoped not to the key of a value but instead to one key group. */
+  private final StateInternals<Void> stateInternals;
+
+  /**
+   * A state tag for each side input that we handle. The state is used to track
+   * for which windows we have input available.
+   */
+  private final Map<
+      PCollectionView<?>,
+      StateTag<
+          Object,
+          AccumulatorCombiningState<
+              BoundedWindow,
+              Set<BoundedWindow>,
+              Set<BoundedWindow>>>> availableWindowsTags;
+
+  /**
+   * State tag for the actual contents of each side input per window.
+   */
+  private final Map<
+      PCollectionView<?>,
+      StateTag<Object, ValueState<Iterable<WindowedValue<?>>>>> sideInputContentsTags;
+
+  /**
+   * Creates a new {@code SideInputHandler} for the given side inputs that uses
+   * the given {@code StateInternals} to store side input data and side-input meta data.
+   */
+  public SideInputHandler(
+      Collection<PCollectionView<?>> sideInputs,
+      StateInternals<Void> stateInternals) {
+    this.sideInputs = sideInputs;
+    this.stateInternals = stateInternals;
+    this.availableWindowsTags = new HashMap<>();
+    this.sideInputContentsTags = new HashMap<>();
+
+    for (PCollectionView<?> sideInput: sideInputs) {
+
+      @SuppressWarnings("unchecked")
+      Coder<BoundedWindow> windowCoder =
+          (Coder<BoundedWindow>) sideInput
+              .getWindowingStrategyInternal()
+              .getWindowFn()
+              .windowCoder();
+
+      StateTag<
+          Object,
+          AccumulatorCombiningState<
+              BoundedWindow,
+              Set<BoundedWindow>,
+              Set<BoundedWindow>>> availableTag = StateTags.combiningValue(
+          "side-input-available-windows-" + sideInput.getTagInternal().getId(),
+          SetCoder.of(windowCoder),
+          new WindowSetCombineFn());
+
+      availableWindowsTags.put(sideInput, availableTag);
+
+      Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal();
+      StateTag<Object, ValueState<Iterable<WindowedValue<?>>>> stateTag =
+          StateTags.value("side-input-data-" + sideInput.getTagInternal().getId(), coder);
+      sideInputContentsTags.put(sideInput, stateTag);
+    }
+  }
+
+  /**
+   * Add the given value to the internal side-input store of the given side input. This
+   * might change the result of {@link #isReady(PCollectionView, BoundedWindow)} for that side
+   * input.
+   */
+  public void addSideInputValue(
+      PCollectionView<?> sideInput,
+      WindowedValue<Iterable<?>> value) {
+
+    @SuppressWarnings("unchecked")
+    Coder<BoundedWindow> windowCoder =
+        (Coder<BoundedWindow>) sideInput
+            .getWindowingStrategyInternal()
+            .getWindowFn()
+            .windowCoder();
+
+    // reify the WindowedValue
+    List<WindowedValue<?>> inputWithReifiedWindows = new ArrayList<>();
+    for (Object e: value.getValue()) {
+      inputWithReifiedWindows.add(value.withValue(e));
+    }
+
+    StateTag<Object, ValueState<Iterable<WindowedValue<?>>>> stateTag =
+        sideInputContentsTags.get(sideInput);
+
+    for (BoundedWindow window: value.getWindows()) {
+      stateInternals
+          .state(StateNamespaces.window(windowCoder, window), stateTag)
+          .write(inputWithReifiedWindows);
+
+      stateInternals
+          .state(StateNamespaces.global(), availableWindowsTags.get(sideInput))
+          .add(window);
+    }
+  }
+
+  @Nullable
+  @Override
+  public <T> T get(PCollectionView<T> sideInput, BoundedWindow window) {
+
+    if (!isReady(sideInput, window)) {
+      throw new IllegalStateException(
+          "Side input " + sideInput + " is not ready for window " + window);
+    }
+
+    @SuppressWarnings("unchecked")
+    Coder<BoundedWindow> windowCoder =
+        (Coder<BoundedWindow>) sideInput
+            .getWindowingStrategyInternal()
+            .getWindowFn()
+            .windowCoder();
+
+    StateTag<Object, ValueState<Iterable<WindowedValue<?>>>> stateTag =
+        sideInputContentsTags.get(sideInput);
+
+    ValueState<Iterable<WindowedValue<?>>> state =
+        stateInternals.state(StateNamespaces.window(windowCoder, window), stateTag);
+
+    Iterable<WindowedValue<?>> elements = state.read();
+
+    return sideInput.fromIterableInternal(elements);
+  }
+
+  @Override
+  public boolean isReady(PCollectionView<?> sideInput, BoundedWindow window) {
+    Set<BoundedWindow> readyWindows =
+        stateInternals.state(StateNamespaces.global(), availableWindowsTags.get(sideInput)).read();
+
+    boolean result = readyWindows != null && readyWindows.contains(window);
+    return result;
+  }
+
+  @Override
+  public <T> boolean contains(PCollectionView<T> view) {
+    return sideInputs.contains(view);
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return sideInputs.isEmpty();
+  }
+
+  /**
+   * For keeping track of the windows for which we have available side input.
+   */
+  private static class WindowSetCombineFn
+      extends Combine.CombineFn<BoundedWindow, Set<BoundedWindow>, Set<BoundedWindow>> {
+
+    @Override
+    public Set<BoundedWindow> createAccumulator() {
+      return new HashSet<>();
+    }
+
+    @Override
+    public Set<BoundedWindow> addInput(Set<BoundedWindow> accumulator, BoundedWindow input) {
+      accumulator.add(input);
+      return accumulator;
+    }
+
+    @Override
+    public Set<BoundedWindow> mergeAccumulators(Iterable<Set<BoundedWindow>> accumulators) {
+      Set<BoundedWindow> result = new HashSet<>();
+      for (Set<BoundedWindow> acc: accumulators) {
+        result.addAll(acc);
+      }
+      return result;
+    }
+
+    @Override
+    public Set<BoundedWindow> extractOutput(Set<BoundedWindow> accumulator) {
+      return accumulator;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
new file mode 100644
index 0000000..641e25e
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.PCollectionViewTesting;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.InMemoryStateInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Unit tests for {@link SideInputHandler}.
+ */
+@RunWith(JUnit4.class)
+public class SideInputHandlerTest {
+
+  private static final long WINDOW_MSECS_1 = 100;
+  private static final long WINDOW_MSECS_2 = 500;
+
+  private WindowingStrategy<Object, IntervalWindow> windowingStrategy1 =
+      WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_1)));
+
+  private PCollectionView<Iterable<String>> view1 = PCollectionViewTesting.testingView(
+      new TupleTag<Iterable<WindowedValue<String>>>() {},
+      new PCollectionViewTesting.IdentityViewFn<String>(),
+      StringUtf8Coder.of(),
+      windowingStrategy1);
+
+  private WindowingStrategy<Object, IntervalWindow> windowingStrategy2 =
+      WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_2)));
+
+  private PCollectionView<Iterable<String>> view2 = PCollectionViewTesting.testingView(
+      new TupleTag<Iterable<WindowedValue<String>>>() {},
+      new PCollectionViewTesting.IdentityViewFn<String>(),
+      StringUtf8Coder.of(),
+      windowingStrategy2);
+
+  @Test
+  public void testIsEmpty() {
+    SideInputHandler sideInputHandler = new SideInputHandler(
+        ImmutableList.<PCollectionView<?>>of(view1),
+        InMemoryStateInternals.<Void>forKey(null));
+
+    assertFalse(sideInputHandler.isEmpty());
+
+    // create an empty handler
+    SideInputHandler emptySideInputHandler = new SideInputHandler(
+        ImmutableList.<PCollectionView<?>>of(),
+        InMemoryStateInternals.<Void>forKey(null));
+
+    assertTrue(emptySideInputHandler.isEmpty());
+  }
+
+  @Test
+  public void testContains() {
+    SideInputHandler sideInputHandler = new SideInputHandler(
+        ImmutableList.<PCollectionView<?>>of(view1),
+        InMemoryStateInternals.<Void>forKey(null));
+
+    assertTrue(sideInputHandler.contains(view1));
+    assertFalse(sideInputHandler.contains(view2));
+  }
+
+  @Test
+  public void testIsReady() {
+    SideInputHandler sideInputHandler = new SideInputHandler(
+        ImmutableList.<PCollectionView<?>>of(view1, view2),
+        InMemoryStateInternals.<Void>forKey(null));
+
+    IntervalWindow firstWindow =
+        new IntervalWindow(new Instant(0), new Instant(WINDOW_MSECS_1));
+
+    IntervalWindow secondWindow =
+        new IntervalWindow(new Instant(0), new Instant(WINDOW_MSECS_2));
+
+
+    // side input should not yet be ready
+    assertFalse(sideInputHandler.isReady(view1, firstWindow));
+
+    // add a value for view1
+    sideInputHandler.addSideInputValue(
+        view1,
+        valuesInWindow(ImmutableList.of("Hello"), new Instant(0), firstWindow));
+
+    // now side input should be ready
+    assertTrue(sideInputHandler.isReady(view1, firstWindow));
+
+    // second window input should still not be ready
+    assertFalse(sideInputHandler.isReady(view1, secondWindow));
+  }
+
+  @Test
+  public void testNewInputReplacesPreviousInput() {
+    // new input should completely replace old input
+    // the creation of the Iterable that has the side input
+    // contents happens upstream. this is also where
+    // accumulation/discarding is decided.
+
+    SideInputHandler sideInputHandler = new SideInputHandler(
+        ImmutableList.<PCollectionView<?>>of(view1),
+        InMemoryStateInternals.<Void>forKey(null));
+
+    IntervalWindow window =
+        new IntervalWindow(new Instant(0), new Instant(WINDOW_MSECS_1));
+
+    // add a first value for view1
+    sideInputHandler.addSideInputValue(
+        view1,
+        valuesInWindow(ImmutableList.of("Hello"), new Instant(0), window));
+
+    Assert.assertThat(sideInputHandler.get(view1, window), contains("Hello"));
+
+    // subsequent values should replace existing values
+    sideInputHandler.addSideInputValue(
+        view1,
+        valuesInWindow(ImmutableList.of("Ciao", "Buongiorno"), new Instant(0), window));
+
+    Assert.assertThat(sideInputHandler.get(view1, window), contains("Ciao", "Buongiorno"));
+  }
+
+  @Test
+  public void testMultipleWindows() {
+    SideInputHandler sideInputHandler = new SideInputHandler(
+        ImmutableList.<PCollectionView<?>>of(view1),
+        InMemoryStateInternals.<Void>forKey(null));
+
+    // two windows that we'll later use for adding elements/retrieving side input
+    IntervalWindow firstWindow =
+        new IntervalWindow(new Instant(0), new Instant(WINDOW_MSECS_1));
+    IntervalWindow secondWindow =
+        new IntervalWindow(new Instant(1000), new Instant(1000 + WINDOW_MSECS_2));
+
+    // add a first value for view1 in the first window
+    sideInputHandler.addSideInputValue(
+        view1,
+        valuesInWindow(ImmutableList.of("Hello"), new Instant(0), firstWindow));
+
+    Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+
+    // add something for second window of view1
+    sideInputHandler.addSideInputValue(
+        view1,
+        valuesInWindow(ImmutableList.of("Arrivederci"), new Instant(0), secondWindow));
+
+    Assert.assertThat(sideInputHandler.get(view1, secondWindow), contains("Arrivederci"));
+
+    // contents for first window should be unaffected
+    Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+  }
+
+  @Test
+  public void testMultipleSideInputs() {
+    SideInputHandler sideInputHandler = new SideInputHandler(
+        ImmutableList.<PCollectionView<?>>of(view1, view2),
+        InMemoryStateInternals.<Void>forKey(null));
+
+    // two windows that we'll later use for adding elements/retrieving side input
+    IntervalWindow firstWindow =
+        new IntervalWindow(new Instant(0), new Instant(WINDOW_MSECS_1));
+
+    // add value for view1 in the first window
+    sideInputHandler.addSideInputValue(
+        view1,
+        valuesInWindow(ImmutableList.of("Hello"), new Instant(0), firstWindow));
+
+    Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+
+    // view2 should not have any data
+    assertFalse(sideInputHandler.isReady(view2, firstWindow));
+
+    // also add some data for view2
+    sideInputHandler.addSideInputValue(
+        view2,
+        valuesInWindow(ImmutableList.of("Salut"), new Instant(0), firstWindow));
+
+    assertTrue(sideInputHandler.isReady(view2, firstWindow));
+    Assert.assertThat(sideInputHandler.get(view2, firstWindow), contains("Salut"));
+
+    // view1 should not be affected by that
+    Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  private WindowedValue<Iterable<?>> valuesInWindow(
+      Iterable<?> values, Instant timestamp, BoundedWindow window) {
+    return (WindowedValue) WindowedValue.of(values, timestamp, window, PaneInfo.NO_FIRING);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 47c4877..b0e88b7 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -18,14 +18,28 @@
 package org.apache.beam.runners.flink;
 
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.PCollectionViews;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.slf4j.Logger;
@@ -36,6 +50,7 @@ import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -54,6 +69,9 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
    */
   private final FlinkPipelineOptions options;
 
+  /** Custom transforms implementations. */
+  private final Map<Class<?>, Class<?>> overrides;
+
   /**
    * Construct a runner from the provided options.
    *
@@ -93,6 +111,18 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
 
   private FlinkRunner(FlinkPipelineOptions options) {
     this.options = options;
+
+    ImmutableMap.Builder<Class<?>, Class<?>> builder = ImmutableMap.<Class<?>, Class<?>>builder();
+    if (options.isStreaming()) {
+      builder.put(Combine.GloballyAsSingletonView.class,
+          StreamingCombineGloballyAsSingletonView.class);
+      builder.put(View.AsMap.class, StreamingViewAsMap.class);
+      builder.put(View.AsMultimap.class, StreamingViewAsMultimap.class);
+      builder.put(View.AsSingleton.class, StreamingViewAsSingleton.class);
+      builder.put(View.AsList.class, StreamingViewAsList.class);
+      builder.put(View.AsIterable.class, StreamingViewAsIterable.class);
+    }
+    overrides = builder.build();
   }
 
   @Override
@@ -135,9 +165,27 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
   }
 
   @Override
-  public <Output extends POutput, Input extends PInput> Output apply(
-      PTransform<Input, Output> transform, Input input) {
-    return super.apply(transform, input);
+  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+      PTransform<InputT, OutputT> transform, InputT input) {
+    if (overrides.containsKey(transform.getClass())) {
+      // It is the responsibility of whoever constructs overrides to ensure this is type safe.
+      @SuppressWarnings("unchecked")
+      Class<PTransform<InputT, OutputT>> transformClass =
+          (Class<PTransform<InputT, OutputT>>) transform.getClass();
+
+      @SuppressWarnings("unchecked")
+      Class<PTransform<InputT, OutputT>> customTransformClass =
+          (Class<PTransform<InputT, OutputT>>) overrides.get(transform.getClass());
+
+      PTransform<InputT, OutputT> customTransform =
+          InstanceBuilder.ofType(customTransformClass)
+              .withArg(transformClass, transform)
+              .build();
+
+      return Pipeline.applyTransform(input, customTransform);
+    } else {
+      return super.apply(transform, input);
+    }
   }
 
   /////////////////////////////////////////////////////////////////////////////
@@ -154,9 +202,10 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
    * @param classLoader The URLClassLoader to use to detect resources to stage.
    * @return A list of absolute paths to the resources the class loader uses.
    * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one
-   *                                  of the resources the class loader exposes is not a file resource.
+   *   of the resources the class loader exposes is not a file resource.
    */
-  protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) {
+  protected static List<String> detectClassPathResourcesToStage(
+      ClassLoader classLoader) {
     if (!(classLoader instanceof URLClassLoader)) {
       String message = String.format("Unable to use ClassLoader to detect classpath elements. "
           + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
@@ -176,4 +225,331 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
     }
     return files;
   }
+
+  /**
+   * Specialized implementation for
+   * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
+   * for the Flink runner in streaming mode.
+   */
+  private static class StreamingViewAsMap<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsMap(View.AsMap<K, V> transform) {
+    }
+
+    @Override
+    public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) {
+      PCollectionView<Map<K, V>> view =
+          PCollectionViews.mapView(
+              input.getPipeline(),
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+      try {
+        inputCoder.getKeyCoder().verifyDeterministic();
+      } catch (Coder.NonDeterministicException e) {
+//        runner.recordViewUsesNonDeterministicKeyCoder(this);
+      }
+
+      return input
+          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+          .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, V>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsMap";
+    }
+  }
+
+  /**
+   * Specialized expansion for {@link
+   * org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the
+   * Flink runner in streaming mode.
+   */
+  private static class StreamingViewAsMultimap<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
+
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsMultimap(View.AsMultimap<K, V> transform) {
+    }
+
+    @Override
+    public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+      PCollectionView<Map<K, Iterable<V>>> view =
+          PCollectionViews.multimapView(
+              input.getPipeline(),
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+      try {
+        inputCoder.getKeyCoder().verifyDeterministic();
+      } catch (Coder.NonDeterministicException e) {
+//        runner.recordViewUsesNonDeterministicKeyCoder(this);
+      }
+
+      return input
+          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+          .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsMultimap";
+    }
+  }
+
+  /**
+   * Specialized implementation for
+   * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the
+   * Flink runner in streaming mode.
+   */
+  private static class StreamingViewAsList<T>
+      extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsList(View.AsList<T> transform) {}
+
+    @Override
+    public PCollectionView<List<T>> apply(PCollection<T> input) {
+      PCollectionView<List<T>> view =
+          PCollectionViews.listView(
+              input.getPipeline(),
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+          .apply(CreateFlinkPCollectionView.<T, List<T>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsList";
+    }
+  }
+
+  /**
+   * Specialized implementation for
+   * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} for the
+   * Flink runner in streaming mode.
+   */
+  private static class StreamingViewAsIterable<T>
+      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsIterable(View.AsIterable<T> transform) { }
+
+    @Override
+    public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
+      PCollectionView<Iterable<T>> view =
+          PCollectionViews.iterableView(
+              input.getPipeline(),
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+          .apply(CreateFlinkPCollectionView.<T, Iterable<T>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsIterable";
+    }
+  }
+
+  private static class WrapAsList<T> extends OldDoFn<T, List<T>> {
+    @Override
+    public void processElement(ProcessContext c) {
+      c.output(Arrays.asList(c.element()));
+    }
+  }
+
+  /**
+   * Specialized expansion for
+   * {@link org.apache.beam.sdk.transforms.View.AsSingleton View.AsSingleton} for the
+   * Flink runner in streaming mode.
+   */
+  private static class StreamingViewAsSingleton<T>
+      extends PTransform<PCollection<T>, PCollectionView<T>> {
+    private View.AsSingleton<T> transform;
+
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsSingleton(View.AsSingleton<T> transform) {
+      this.transform = transform;
+    }
+
+    @Override
+    public PCollectionView<T> apply(PCollection<T> input) {
+      Combine.Globally<T, T> combine = Combine.globally(
+          new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
+      if (!transform.hasDefaultValue()) {
+        combine = combine.withoutDefaults();
+      }
+      return input.apply(combine.asSingletonView());
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsSingleton";
+    }
+
+    private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
+      private boolean hasDefaultValue;
+      private T defaultValue;
+
+      SingletonCombine(boolean hasDefaultValue, T defaultValue) {
+        this.hasDefaultValue = hasDefaultValue;
+        this.defaultValue = defaultValue;
+      }
+
+      @Override
+      public T apply(T left, T right) {
+        throw new IllegalArgumentException("PCollection with more than one element "
+            + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
+            + "combine the PCollection into a single value");
+      }
+
+      @Override
+      public T identity() {
+        if (hasDefaultValue) {
+          return defaultValue;
+        } else {
+          throw new IllegalArgumentException(
+              "Empty PCollection accessed as a singleton view. "
+                  + "Consider setting withDefault to provide a default value");
+        }
+      }
+    }
+  }
+
+  private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
+      extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
+    Combine.GloballyAsSingletonView<InputT, OutputT> transform;
+
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingCombineGloballyAsSingletonView(
+        Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
+      this.transform = transform;
+    }
+
+    @Override
+    public PCollectionView<OutputT> apply(PCollection<InputT> input) {
+      PCollection<OutputT> combined =
+          input.apply(Combine.globally(transform.getCombineFn())
+              .withoutDefaults()
+              .withFanout(transform.getFanout()));
+
+      PCollectionView<OutputT> view = PCollectionViews.singletonView(
+          combined.getPipeline(),
+          combined.getWindowingStrategy(),
+          transform.getInsertDefault(),
+          transform.getInsertDefault()
+              ? transform.getCombineFn().defaultValue() : null,
+          combined.getCoder());
+      return combined
+          .apply(ParDo.of(new WrapAsList<OutputT>()))
+          .apply(CreateFlinkPCollectionView.<OutputT, OutputT>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingCombineGloballyAsSingletonView";
+    }
+  }
+
+  /**
+   * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
+   *
+   * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
+   * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
+   * They require the input {@link PCollection} fits in memory.
+   * For a large {@link PCollection} this is expected to crash!
+   *
+   * @param <T> the type of elements to concatenate.
+   */
+  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
+    @Override
+    public List<T> createAccumulator() {
+      return new ArrayList<T>();
+    }
+
+    @Override
+    public List<T> addInput(List<T> accumulator, T input) {
+      accumulator.add(input);
+      return accumulator;
+    }
+
+    @Override
+    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+      List<T> result = createAccumulator();
+      for (List<T> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
+
+    @Override
+    public List<T> extractOutput(List<T> accumulator) {
+      return accumulator;
+    }
+
+    @Override
+    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+
+    @Override
+    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+  }
+
+  /**
+   * Creates a primitive {@link PCollectionView}.
+   *
+   * <p>For internal use only by runner implementors.
+   *
+   * @param <ElemT> The type of the elements of the input PCollection
+   * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
+   */
+  public static class CreateFlinkPCollectionView<ElemT, ViewT>
+      extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
+    private PCollectionView<ViewT> view;
+
+    private CreateFlinkPCollectionView(PCollectionView<ViewT> view) {
+      this.view = view;
+    }
+
+    public static <ElemT, ViewT> CreateFlinkPCollectionView<ElemT, ViewT> of(
+        PCollectionView<ViewT> view) {
+      return new CreateFlinkPCollectionView<>(view);
+    }
+
+    public PCollectionView<ViewT> getView() {
+      return view;
+    }
+
+    @Override
+    public PCollectionView<ViewT> apply(PCollection<List<ElemT>> input) {
+      return view;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
index 460933f..2a82749 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
@@ -59,7 +59,9 @@ public class TestFlinkRunner extends PipelineRunner<FlinkRunnerResult> {
   @Override
   public FlinkRunnerResult run(Pipeline pipeline) {
     try {
-      return delegate.run(pipeline);
+      FlinkRunnerResult result = delegate.run(pipeline);
+
+      return result;
     } catch (RuntimeException e) {
       // Special case hack to pull out assertion errors from PAssert; instead there should
       // probably be a better story along the lines of UserCodeException.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
index 2e655a3..3bb8c59 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
@@ -28,10 +28,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This is a {@link FlinkPipelineTranslator} for streaming jobs. Its role is to translate the user-provided
- * {@link org.apache.beam.sdk.values.PCollection}-based job into a
+ * This is a {@link FlinkPipelineTranslator} for streaming jobs. Its role is to translate
+ * the user-provided {@link org.apache.beam.sdk.values.PCollection}-based job into a
  * {@link org.apache.flink.streaming.api.datastream.DataStream} one.
- * */
+ *
+ */
 public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
 
   private static final Logger LOG = LoggerFactory.getLogger(FlinkStreamingPipelineTranslator.class);
@@ -55,8 +56,10 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
 
     PTransform<?, ?> transform = node.getTransform();
     if (transform != null) {
-      StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
-      if (translator != null) {
+      StreamTransformTranslator<?> translator =
+          FlinkStreamingTransformTranslators.getTranslator(transform);
+
+      if (translator != null && applyCanTranslate(transform, node, translator)) {
         applyStreamingTransform(transform, node, translator);
         LOG.info(genSpaces(this.depth) + "translated-" + formatNodeName(node));
         return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
@@ -79,10 +82,13 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
     // currently visiting and translate it into its Flink alternative.
 
     PTransform<?, ?> transform = node.getTransform();
-    StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
-    if (translator == null) {
+    StreamTransformTranslator<?> translator =
+        FlinkStreamingTransformTranslators.getTranslator(transform);
+
+    if (translator == null && applyCanTranslate(transform, node, translator)) {
       LOG.info(node.getTransform().getClass().toString());
-      throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
+      throw new UnsupportedOperationException(
+          "The transform " + transform + " is currently not supported.");
     }
     applyStreamingTransform(transform, node, translator);
   }
@@ -92,7 +98,10 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
     // do nothing here
   }
 
-  private <T extends PTransform<?, ?>> void applyStreamingTransform(PTransform<?, ?> transform, TransformTreeNode node, StreamTransformTranslator<?> translator) {
+  private <T extends PTransform<?, ?>> void applyStreamingTransform(
+      PTransform<?, ?> transform,
+      TransformTreeNode node,
+      StreamTransformTranslator<?> translator) {
 
     @SuppressWarnings("unchecked")
     T typedTransform = (T) transform;
@@ -106,13 +115,41 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
     typedTranslator.translateNode(typedTransform, streamingContext);
   }
 
+  private <T extends PTransform<?, ?>> boolean applyCanTranslate(
+      PTransform<?, ?> transform,
+      TransformTreeNode node,
+      StreamTransformTranslator<?> translator) {
+
+    @SuppressWarnings("unchecked")
+    T typedTransform = (T) transform;
+
+    @SuppressWarnings("unchecked")
+    StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator;
+
+    streamingContext.setCurrentTransform(AppliedPTransform.of(
+        node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform));
+
+    return typedTranslator.canTranslate(typedTransform, streamingContext);
+  }
+
   /**
    * The interface that every Flink translator of a Beam operator should implement.
    * This interface is for <b>streaming</b> jobs. For examples of such translators see
    * {@link FlinkStreamingTransformTranslators}.
    */
-  public interface StreamTransformTranslator<Type extends PTransform> {
-    void translateNode(Type transform, FlinkStreamingTranslationContext context);
+  abstract static class StreamTransformTranslator<T extends PTransform> {
+
+    /**
+     * Translate the given transform.
+     */
+    abstract void translateNode(T transform, FlinkStreamingTranslationContext context);
+
+    /**
+     * Returns true iff this translator can translate the given transform.
+     */
+    boolean canTranslate(T transform, FlinkStreamingTranslationContext context) {
+      return true;
+    }
   }
 
   private static String formatNodeName(TransformTreeNode node) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 8167623..6c2c703 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -18,31 +18,29 @@
 
 package org.apache.beam.runners.flink.translation;
 
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
+import org.apache.beam.runners.flink.FlinkRunner;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.types.FlinkCoder;
-import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.FlinkStreamingCreateFunction;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Sink;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
@@ -53,7 +51,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.AppliedCombineFn;
-import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.util.SystemReduceFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -68,24 +66,27 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 import org.apache.flink.util.Collector;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -112,19 +113,22 @@ public class FlinkStreamingTransformTranslators {
 
   // here you can find all the available translators.
   static {
-    TRANSLATORS.put(Create.Values.class, new CreateStreamingTranslator());
     TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator());
     TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
-    TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
+    TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator());
     TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
 
-    TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator());
+    TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
+    TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator());
 
     TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator());
+    TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator());
+    TRANSLATORS.put(
+        FlinkRunner.CreateFlinkPCollectionView.class, new CreateViewStreamingTranslator());
+
+    TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorStreaming());
     TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
     TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
-    TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator());
-    TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator());
   }
 
   public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator(
@@ -136,54 +140,8 @@ public class FlinkStreamingTransformTranslators {
   //  Transformation Implementations
   // --------------------------------------------------------------------------------------------
 
-  private static class CreateStreamingTranslator<OutputT> implements
-      FlinkStreamingPipelineTranslator.StreamTransformTranslator<Create.Values<OutputT>> {
-
-    @Override
-    public void translateNode(
-        Create.Values<OutputT> transform,
-        FlinkStreamingTranslationContext context) {
-
-      PCollection<OutputT> output = context.getOutput(transform);
-      Iterable<OutputT> elements = transform.getElements();
-
-      // we need to serialize the elements to byte arrays, since they might contain
-      // elements that are not serializable by Java serialization. We deserialize them
-      // in the FlatMap function using the Coder.
-
-      List<byte[]> serializedElements = Lists.newArrayList();
-      Coder<OutputT> elementCoder = output.getCoder();
-      for (OutputT element: elements) {
-        ByteArrayOutputStream bao = new ByteArrayOutputStream();
-        try {
-          elementCoder.encode(element, bao, Coder.Context.OUTER);
-          serializedElements.add(bao.toByteArray());
-        } catch (IOException e) {
-          throw new RuntimeException("Could not serialize Create elements using Coder: " + e);
-        }
-      }
-
-
-      DataStream<Integer> initDataSet = context.getExecutionEnvironment().fromElements(1);
-
-      FlinkStreamingCreateFunction<Integer, OutputT> createFunction =
-          new FlinkStreamingCreateFunction<>(serializedElements, elementCoder);
-
-      WindowedValue.ValueOnlyWindowedValueCoder<OutputT> windowCoder =
-          WindowedValue.getValueOnlyCoder(elementCoder);
-
-      TypeInformation<WindowedValue<OutputT>> outputType = new CoderTypeInformation<>(windowCoder);
-
-      DataStream<WindowedValue<OutputT>> outputDataStream = initDataSet
-          .flatMap(createFunction).returns(outputType);
-
-      context.setOutputDataStream(output, outputDataStream);
-    }
-  }
-
-
   private static class TextIOWriteBoundStreamingTranslator<T>
-      implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
         TextIO.Write.Bound<T>> {
 
     private static final Logger LOG =
@@ -230,7 +188,7 @@ public class FlinkStreamingTransformTranslators {
   }
 
   private static class WriteSinkStreamingTranslator<T>
-      implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> {
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> {
 
     @Override
     public void translateNode(Write.Bound<T> transform, FlinkStreamingTranslationContext context) {
@@ -254,29 +212,8 @@ public class FlinkStreamingTransformTranslators {
     }
   }
 
-  private static class BoundedReadSourceTranslator<T>
-      implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>> {
-
-    @Override
-    public void translateNode(Read.Bounded<T> transform, FlinkStreamingTranslationContext context) {
-
-      BoundedSource<T> boundedSource = transform.getSource();
-      PCollection<T> output = context.getOutput(transform);
-
-      TypeInformation<WindowedValue<T>> typeInfo = context.getTypeInfo(output);
-
-      DataStream<WindowedValue<T>> source = context.getExecutionEnvironment().createInput(
-          new SourceInputFormat<>(
-              boundedSource,
-              context.getPipelineOptions()),
-          typeInfo);
-
-      context.setOutputDataStream(output, source);
-    }
-  }
-
   private static class UnboundedReadSourceTranslator<T>
-      implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
 
     @Override
     public void translateNode(
@@ -290,7 +227,8 @@ public class FlinkStreamingTransformTranslators {
         UnboundedFlinkSource<T> flinkSourceFunction =
             (UnboundedFlinkSource<T>) transform.getSource();
 
-        final AssignerWithPeriodicWatermarks<T> flinkAssigner = flinkSourceFunction.getFlinkTimestampAssigner();
+        final AssignerWithPeriodicWatermarks<T> flinkAssigner =
+            flinkSourceFunction.getFlinkTimestampAssigner();
 
         DataStream<T> flinkSource = context.getExecutionEnvironment()
             .addSource(flinkSourceFunction.getFlinkSource());
@@ -332,8 +270,37 @@ public class FlinkStreamingTransformTranslators {
     }
   }
 
+  private static class BoundedReadSourceTranslator<T>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>> {
+
+    @Override
+    public void translateNode(
+        Read.Bounded<T> transform,
+        FlinkStreamingTranslationContext context) {
+      PCollection<T> output = context.getOutput(transform);
+
+      DataStream<WindowedValue<T>> source;
+      try {
+        transform.getSource();
+        BoundedSourceWrapper<T> sourceWrapper =
+            new BoundedSourceWrapper<>(
+                context.getPipelineOptions(),
+                transform.getSource(),
+                context.getExecutionEnvironment().getParallelism());
+        source = context
+            .getExecutionEnvironment()
+            .addSource(sourceWrapper).name(transform.getName());
+      } catch (Exception e) {
+        throw new RuntimeException(
+            "Error while translating BoundedSource: " + transform.getSource(), e);
+      }
+
+      context.setOutputDataStream(output, source);
+    }
+  }
+
   private static class ParDoBoundStreamingTranslator<InputT, OutputT>
-      implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
         ParDo.Bound<InputT, OutputT>> {
 
     @Override
@@ -347,27 +314,292 @@ public class FlinkStreamingTransformTranslators {
       TypeInformation<WindowedValue<OutputT>> typeInfo =
           context.getTypeInfo(context.getOutput(transform));
 
-      DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator = new DoFnOperator<>(
-          transform.getFn(),
-          new TupleTag<OutputT>("main output"),
-          Collections.<TupleTag<?>>emptyList(),
-          new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<OutputT>>(),
-          windowingStrategy,
-          new HashMap<PCollectionView<?>, WindowingStrategy<?, ?>>(),
-          context.getPipelineOptions());
+      List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+
+      @SuppressWarnings("unchecked")
+      PCollection<InputT> inputPCollection = (PCollection<InputT>) context.getInput(transform);
+
+      TypeInformation<WindowedValue<InputT>> inputTypeInfo =
+          context.getTypeInfo(inputPCollection);
+
+      if (sideInputs.isEmpty()) {
+        DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator =
+            new DoFnOperator<>(
+                transform.getFn(),
+                inputTypeInfo,
+                new TupleTag<OutputT>("main output"),
+                Collections.<TupleTag<?>>emptyList(),
+                new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<OutputT>>(),
+                windowingStrategy,
+                new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+                Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+                context.getPipelineOptions());
+
+        DataStream<WindowedValue<InputT>> inputDataStream =
+            context.getInputDataStream(context.getInput(transform));
+
+        SingleOutputStreamOperator<WindowedValue<OutputT>> outDataStream = inputDataStream
+            .transform(transform.getName(), typeInfo, doFnOperator);
+
+        context.setOutputDataStream(context.getOutput(transform), outDataStream);
+      } else {
+        Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformedSideInputs =
+            transformSideInputs(sideInputs, context);
+
+        DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator =
+            new DoFnOperator<>(
+                transform.getFn(),
+                inputTypeInfo,
+                new TupleTag<OutputT>("main output"),
+                Collections.<TupleTag<?>>emptyList(),
+                new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<OutputT>>(),
+                windowingStrategy,
+                transformedSideInputs.f0,
+                sideInputs,
+                context.getPipelineOptions());
+
+        DataStream<WindowedValue<InputT>> inputDataStream =
+            context.getInputDataStream(context.getInput(transform));
+
+        SingleOutputStreamOperator<WindowedValue<OutputT>> outDataStream = inputDataStream
+            .connect(transformedSideInputs.f1.broadcast())
+            .transform(transform.getName(), typeInfo, doFnOperator);
+
+        context.setOutputDataStream(context.getOutput(transform), outDataStream);
+
+      }
+    }
+  }
+
+  /**
+   * Wraps each element in a {@link RawUnionValue} with the given tag id.
+   */
+  private static class ToRawUnion<T> implements MapFunction<T, RawUnionValue> {
+    private final int intTag;
+
+    public ToRawUnion(int intTag) {
+      this.intTag = intTag;
+    }
+
+    @Override
+    public RawUnionValue map(T o) throws Exception {
+      return new RawUnionValue(intTag, o);
+    }
+  }
+
+  private static Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>>
+        transformSideInputs(
+          Collection<PCollectionView<?>> sideInputs,
+          FlinkStreamingTranslationContext context) {
+
+    // collect all side inputs
+    Map<TupleTag<?>, Integer> tagToIntMapping = new HashMap<>();
+    Map<Integer, PCollectionView<?>> intToViewMapping = new HashMap<>();
+    int count = 0;
+    for (PCollectionView<?> sideInput: sideInputs) {
+      TupleTag<?> tag = sideInput.getTagInternal();
+      intToViewMapping.put(count, sideInput);
+      tagToIntMapping.put(tag, count);
+      count++;
+      Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal();
+    }
+
+
+    List<Coder<?>> inputCoders = new ArrayList<>();
+    for (PCollectionView<?> sideInput: sideInputs) {
+      DataStream<Object> sideInputStream = context.getInputDataStream(sideInput);
+      TypeInformation<Object> tpe = sideInputStream.getType();
+      if (!(tpe instanceof CoderTypeInformation)) {
+        throw new IllegalStateException(
+            "Input Stream TypeInformation is no CoderTypeInformation.");
+      }
+
+      Coder<?> coder = ((CoderTypeInformation) tpe).getCoder();
+      inputCoders.add(coder);
+    }
+
+    UnionCoder unionCoder = UnionCoder.of(inputCoders);
+
+    CoderTypeInformation<RawUnionValue> unionTypeInformation =
+        new CoderTypeInformation<>(unionCoder);
+
+    // transform each side input to RawUnionValue and union them
+    DataStream<RawUnionValue> sideInputUnion = null;
+
+    for (PCollectionView<?> sideInput: sideInputs) {
+      TupleTag<?> tag = sideInput.getTagInternal();
+      final int intTag = tagToIntMapping.get(tag);
+      DataStream<Object> sideInputStream = context.getInputDataStream(sideInput);
+      DataStream<RawUnionValue> unionValueStream =
+          sideInputStream.map(new ToRawUnion<>(intTag)).returns(unionTypeInformation);
+
+      if (sideInputUnion == null) {
+        sideInputUnion = unionValueStream;
+      } else {
+        sideInputUnion = sideInputUnion.union(unionValueStream);
+      }
+    }
+
+    if (sideInputUnion == null) {
+      throw new IllegalStateException("No unioned side inputs, this indicates a bug.");
+    }
+
+    return new Tuple2<>(intToViewMapping, sideInputUnion);
+  }
+
+
+  private static class ParDoBoundMultiStreamingTranslator<InputT, OutputT>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+      ParDo.BoundMulti<InputT, OutputT>> {
+
+    @Override
+    public void translateNode(
+        ParDo.BoundMulti<InputT, OutputT> transform,
+        FlinkStreamingTranslationContext context) {
+
+      // we assume that the transformation does not change the windowing strategy.
+      WindowingStrategy<?, ?> windowingStrategy =
+          context.getInput(transform).getWindowingStrategy();
+
+      Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
+
+      Map<TupleTag<?>, Integer> tagsToLabels =
+          transformTupleTagsToLabels(transform.getMainOutputTag(), outputs.keySet());
+
+      List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+
+      SingleOutputStreamOperator<RawUnionValue> unionOutputStream;
+
+      @SuppressWarnings("unchecked")
+      PCollection<InputT> inputPCollection = (PCollection<InputT>) context.getInput(transform);
+
+      TypeInformation<WindowedValue<InputT>> inputTypeInfo =
+          context.getTypeInfo(inputPCollection);
+
+      if (sideInputs.isEmpty()) {
+        DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
+            new DoFnOperator<>(
+                transform.getFn(),
+                inputTypeInfo,
+                transform.getMainOutputTag(),
+                transform.getSideOutputTags().getAll(),
+                new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
+                windowingStrategy,
+                new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+                Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+                context.getPipelineOptions());
+
+        UnionCoder outputUnionCoder = createUnionCoder(outputs.values());
+
+        CoderTypeInformation<RawUnionValue> outputUnionTypeInformation =
+            new CoderTypeInformation<>(outputUnionCoder);
+
+        DataStream<WindowedValue<InputT>> inputDataStream =
+            context.getInputDataStream(context.getInput(transform));
+
+        unionOutputStream = inputDataStream
+            .transform(transform.getName(), outputUnionTypeInformation, doFnOperator);
+
+      } else {
+        Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformedSideInputs =
+            transformSideInputs(sideInputs, context);
+
+        DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
+            new DoFnOperator<>(
+                transform.getFn(),
+                inputTypeInfo,
+                transform.getMainOutputTag(),
+                transform.getSideOutputTags().getAll(),
+                new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
+                windowingStrategy,
+                transformedSideInputs.f0,
+                sideInputs,
+                context.getPipelineOptions());
+
+        UnionCoder outputUnionCoder = createUnionCoder(outputs.values());
+
+        CoderTypeInformation<RawUnionValue> outputUnionTypeInformation =
+            new CoderTypeInformation<>(outputUnionCoder);
+
+        DataStream<WindowedValue<InputT>> inputDataStream =
+            context.getInputDataStream(context.getInput(transform));
+
+        unionOutputStream = inputDataStream
+            .connect(transformedSideInputs.f1.broadcast())
+            .transform(transform.getName(), outputUnionTypeInformation, doFnOperator);
+      }
+
+      for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) {
+        final int outputTag = tagsToLabels.get(output.getKey());
+
+        TypeInformation outputTypeInfo =
+            context.getTypeInfo(output.getValue());
+
+        @SuppressWarnings("unchecked")
+        DataStream filtered =
+            unionOutputStream.flatMap(new FlatMapFunction<RawUnionValue, Object>() {
+              @Override
+              public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception {
+                System.out.println("FILTERING: " + value);
+                if (value.getUnionTag() == outputTag) {
+                  System.out.println("EMITTING VALUE: " + value);
+                  out.collect(value.getValue());
+                }
+              }
+            }).returns(outputTypeInfo);
 
-      DataStream<WindowedValue<InputT>> inputDataStream =
+        context.setOutputDataStream(output.getValue(), filtered);
+      }
+    }
+
+    private Map<TupleTag<?>, Integer> transformTupleTagsToLabels(
+        TupleTag<?> mainTag,
+        Set<TupleTag<?>> secondaryTags) {
+
+      Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap();
+      int count = 0;
+      tagToLabelMap.put(mainTag, count++);
+      for (TupleTag<?> tag : secondaryTags) {
+        if (!tagToLabelMap.containsKey(tag)) {
+          tagToLabelMap.put(tag, count++);
+        }
+      }
+      return tagToLabelMap;
+    }
+
+    private UnionCoder createUnionCoder(Collection<PCollection<?>> taggedCollections) {
+      List<Coder<?>> outputCoders = Lists.newArrayList();
+      for (PCollection<?> coll : taggedCollections) {
+        WindowedValue.FullWindowedValueCoder<?> windowedValueCoder =
+            WindowedValue.getFullCoder(
+                coll.getCoder(),
+                coll.getWindowingStrategy().getWindowFn().windowCoder());
+        outputCoders.add(windowedValueCoder);
+      }
+      return UnionCoder.of(outputCoders);
+    }
+  }
+
+  private static class CreateViewStreamingTranslator<ElemT, ViewT>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+      FlinkRunner.CreateFlinkPCollectionView<ElemT, ViewT>> {
+
+    @Override
+    public void translateNode(
+        FlinkRunner.CreateFlinkPCollectionView<ElemT, ViewT> transform,
+        FlinkStreamingTranslationContext context) {
+      // just forward
+      DataStream<WindowedValue<List<ElemT>>> inputDataSet =
           context.getInputDataStream(context.getInput(transform));
 
-      SingleOutputStreamOperator<WindowedValue<OutputT>> outDataStream =
-          inputDataStream.transform(transform.getName(), typeInfo, doFnOperator);
+      PCollectionView<ViewT> input = transform.getView();
 
-      context.setOutputDataStream(context.getOutput(transform), outDataStream);
+      context.setOutputDataStream(input, inputDataSet);
     }
   }
 
   private static class WindowBoundTranslator<T>
-      implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Bound<T>> {
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Bound<T>> {
 
     @Override
     public void translateNode(
@@ -384,17 +616,26 @@ public class FlinkStreamingTransformTranslators {
       OldDoFn<T, T> windowAssignerDoFn =
           createWindowAssigner(windowingStrategy.getWindowFn());
 
+      @SuppressWarnings("unchecked")
+      PCollection<T> inputPCollection = context.getInput(transform);
+
+      TypeInformation<WindowedValue<T>> inputTypeInfo =
+          context.getTypeInfo(inputPCollection);
+
       DoFnOperator<T, T, WindowedValue<T>> doFnOperator = new DoFnOperator<>(
           windowAssignerDoFn,
+          inputTypeInfo,
           new TupleTag<T>("main output"),
           Collections.<TupleTag<?>>emptyList(),
           new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<T>>(),
           windowingStrategy,
-          new HashMap<PCollectionView<?>, WindowingStrategy<?, ?>>(),
+          new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+          Collections.<PCollectionView<?>>emptyList(), /* side inputs */
           context.getPipelineOptions());
 
       DataStream<WindowedValue<T>> inputDataStream =
           context.getInputDataStream(context.getInput(transform));
+
       SingleOutputStreamOperator<WindowedValue<T>> outDataStream = inputDataStream
           .transform(transform.getName(), typeInfo, doFnOperator);
 
@@ -433,8 +674,25 @@ public class FlinkStreamingTransformTranslators {
     }
   }
 
+  private static class ReshuffleTranslatorStreaming<K, InputT>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Reshuffle<K, InputT>> {
+
+    @Override
+    public void translateNode(
+        Reshuffle<K, InputT> transform,
+        FlinkStreamingTranslationContext context) {
+
+      DataStream<WindowedValue<KV<K, InputT>>> inputDataSet =
+          context.getInputDataStream(context.getInput(transform));
+
+      context.setOutputDataStream(context.getOutput(transform), inputDataSet.rebalance());
+
+    }
+  }
+
+
   private static class GroupByKeyTranslator<K, InputT>
-      implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, InputT>> {
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, InputT>> {
 
     @Override
     public void translateNode(
@@ -456,10 +714,11 @@ public class FlinkStreamingTransformTranslators {
 
       DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input);
 
-
-      WindowedValue.ValueOnlyWindowedValueCoder<
-          SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder =
-          WindowedValue.getValueOnlyCoder(workItemCoder);
+      WindowedValue.
+          FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder =
+          WindowedValue.getFullCoder(
+              workItemCoder,
+              input.getWindowingStrategy().getWindowFn().windowCoder());
 
       CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo =
           new CoderTypeInformation<>(windowedWorkItemCoder);
@@ -484,17 +743,16 @@ public class FlinkStreamingTransformTranslators {
             WindowedValue<KV<K, Iterable<InputT>>>> outputManagerFactory =
           new DoFnOperator.DefaultOutputManagerFactory<>();
 
-      WindowDoFnOperator<
-            K,
-            InputT,
-            KV<K, Iterable<InputT>>> doFnOperator =
+      WindowDoFnOperator<K, InputT, Iterable<InputT>> doFnOperator =
           new WindowDoFnOperator<>(
               reduceFn,
+              (TypeInformation) workItemTypeInfo,
               new TupleTag<KV<K, Iterable<InputT>>>("main output"),
               Collections.<TupleTag<?>>emptyList(),
               outputManagerFactory,
               windowingStrategy,
-              new HashMap<PCollectionView<?>, WindowingStrategy<?, ?>>(),
+              new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+              Collections.<PCollectionView<?>>emptyList(), /* side inputs */
               context.getPipelineOptions(),
               inputKvCoder.getKeyCoder());
 
@@ -514,8 +772,25 @@ public class FlinkStreamingTransformTranslators {
   }
 
   private static class CombinePerKeyTranslator<K, InputT, OutputT>
-      implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<
-        Combine.PerKey<K, InputT, OutputT>> {
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+      Combine.PerKey<K, InputT, OutputT>> {
+
+    @Override
+    boolean canTranslate(
+        Combine.PerKey<K, InputT, OutputT> transform,
+        FlinkStreamingTranslationContext context) {
+
+      // if we have a merging window strategy and side inputs we cannot
+      // translate as a proper combine. We have to group and then run the combine
+      // over the final grouped values.
+      PCollection<KV<K, InputT>> input = context.getInput(transform);
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<?, BoundedWindow> windowingStrategy =
+          (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
+
+      return windowingStrategy.getWindowFn().isNonMerging() || transform.getSideInputs().isEmpty();
+    }
 
     @Override
     public void translateNode(
@@ -537,10 +812,11 @@ public class FlinkStreamingTransformTranslators {
 
       DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input);
 
-
-      WindowedValue.ValueOnlyWindowedValueCoder<
-            SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder =
-          WindowedValue.getValueOnlyCoder(workItemCoder);
+      WindowedValue.
+          FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder =
+            WindowedValue.getFullCoder(
+                workItemCoder,
+                input.getWindowingStrategy().getWindowFn().windowCoder());
 
       CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo =
           new CoderTypeInformation<>(windowedWorkItemCoder);
@@ -560,32 +836,78 @@ public class FlinkStreamingTransformTranslators {
           AppliedCombineFn.withInputCoder(
               transform.getFn(), input.getPipeline().getCoderRegistry(), inputKvCoder));
 
-
-      OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> windowDoFn =
-          GroupAlsoByWindowViaWindowSetDoFn.create(windowingStrategy, reduceFn);
-
-
       TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo =
           context.getTypeInfo(context.getOutput(transform));
 
-      WindowDoFnOperator<K, InputT, KV<K, OutputT>, WindowedValue<KV<K, OutputT>>> doFnOperator =
-          new WindowDoFnOperator<>(
-              windowDoFn,
-              new TupleTag<KV<K, OutputT>>("main output"),
-              Collections.<TupleTag<?>>emptyList(),
-              new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(),
-              windowingStrategy,
-              new HashMap<PCollectionView<?>, WindowingStrategy<?, ?>>(),
-              context.getPipelineOptions(),
-              inputKvCoder.getKeyCoder());
-
-      // our operator excepts WindowedValue<KeyedWorkItem> while our input stream
-      // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ...
-      @SuppressWarnings("unchecked")
-      SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream = keyedWorkItemStream
-          .transform(transform.getName(), outputTypeInfo, (OneInputStreamOperator) doFnOperator);
+      List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+
+      if (sideInputs.isEmpty()) {
+
+        WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
+            new WindowDoFnOperator<>(
+                reduceFn,
+                (TypeInformation) workItemTypeInfo,
+                new TupleTag<KV<K, OutputT>>("main output"),
+                Collections.<TupleTag<?>>emptyList(),
+                new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(),
+                windowingStrategy,
+                new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+                Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+                context.getPipelineOptions(),
+                inputKvCoder.getKeyCoder());
+
+        // our operator excepts WindowedValue<KeyedWorkItem> while our input stream
+        // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ...
+        @SuppressWarnings("unchecked")
+        SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream =
+            keyedWorkItemStream.transform(
+                transform.getName(), outputTypeInfo, (OneInputStreamOperator) doFnOperator);
 
-      context.setOutputDataStream(context.getOutput(transform), outDataStream);
+        context.setOutputDataStream(context.getOutput(transform), outDataStream);
+      } else {
+        Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformSideInputs =
+            transformSideInputs(sideInputs, context);
+
+        WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
+            new WindowDoFnOperator<>(
+                reduceFn,
+                (TypeInformation) workItemTypeInfo,
+                new TupleTag<KV<K, OutputT>>("main output"),
+                Collections.<TupleTag<?>>emptyList(),
+                new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(),
+                windowingStrategy,
+                transformSideInputs.f0,
+                sideInputs,
+                context.getPipelineOptions(),
+                inputKvCoder.getKeyCoder());
+
+        // we have to manually contruct the two-input transform because we're not
+        // allowed to have only one input keyed, normally.
+
+        TwoInputTransformation<
+            WindowedValue<SingletonKeyedWorkItem<K, InputT>>,
+            RawUnionValue,
+            WindowedValue<KV<K, OutputT>>> rawFlinkTransform = new TwoInputTransformation<>(
+            keyedWorkItemStream.getTransformation(),
+            transformSideInputs.f1.broadcast().getTransformation(),
+            transform.getName(),
+            (TwoInputStreamOperator) doFnOperator,
+            outputTypeInfo,
+            keyedWorkItemStream.getParallelism());
+
+        rawFlinkTransform.setStateKeyType(keyedWorkItemStream.getKeyType());
+        rawFlinkTransform.setStateKeySelectors(keyedWorkItemStream.getKeySelector(), null);
+
+        @SuppressWarnings({ "unchecked", "rawtypes" })
+        SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream =
+            new SingleOutputStreamOperator(
+                keyedWorkItemStream.getExecutionEnvironment(),
+                rawFlinkTransform) {}; // we have to cheat around the ctor being protected
+
+        keyedWorkItemStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
+
+        context.setOutputDataStream(context.getOutput(transform), outDataStream);
+      }
     }
 
     private static class ToKeyedWorkItem<K, InputT>
@@ -595,21 +917,27 @@ public class FlinkStreamingTransformTranslators {
 
       @Override
       public void flatMap(
-          WindowedValue<KV<K, InputT>> in,
+          WindowedValue<KV<K, InputT>> inWithMultipleWindows,
           Collector<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> out) throws Exception {
 
-        SingletonKeyedWorkItem<K, InputT> workItem =
-            new SingletonKeyedWorkItem<>(
-                in.getValue().getKey(),
-                in.withValue(in.getValue().getValue()));
-
-        out.collect(WindowedValue.valueInEmptyWindows(workItem));
+        // we need to wrap each one work item per window for now
+        // since otherwise the PushbackSideInputRunner will not correctly
+        // determine whether side inputs are ready
+        for (WindowedValue<KV<K, InputT>> in : inWithMultipleWindows.explodeWindows()) {
+          SingletonKeyedWorkItem<K, InputT> workItem =
+              new SingletonKeyedWorkItem<>(
+                  in.getValue().getKey(),
+                  in.withValue(in.getValue().getValue()));
+
+          in.withValue(workItem);
+          out.collect(in.withValue(workItem));
+        }
       }
     }
   }
 
   private static class FlattenPCollectionTranslator<T>
-      implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
         Flatten.FlattenPCollectionList<T>> {
 
     @Override
@@ -617,91 +945,38 @@ public class FlinkStreamingTransformTranslators {
         Flatten.FlattenPCollectionList<T> transform,
         FlinkStreamingTranslationContext context) {
       List<PCollection<T>> allInputs = context.getInput(transform).getAll();
-      DataStream<T> result = null;
-      for (PCollection<T> collection : allInputs) {
-        DataStream<T> current = context.getInputDataStream(collection);
-        result = (result == null) ? current : result.union(current);
-      }
-      context.setOutputDataStream(context.getOutput(transform), result);
-    }
-  }
-
-  private static class ParDoBoundMultiStreamingTranslator<InputT, OutputT>
-      implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<
-        ParDo.BoundMulti<InputT, OutputT>> {
-
-    @Override
-    public void translateNode(
-        ParDo.BoundMulti<InputT, OutputT> transform,
-        FlinkStreamingTranslationContext context) {
-
-      // we assume that the transformation does not change the windowing strategy.
-      WindowingStrategy<?, ?> windowingStrategy =
-          context.getInput(transform).getWindowingStrategy();
-
-      Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
-
-      Map<TupleTag<?>, Integer> tagsToLabels =
-          transformTupleTagsToLabels(transform.getMainOutputTag(), outputs.keySet());
-
-      DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator = new DoFnOperator<>(
-          transform.getFn(),
-          transform.getMainOutputTag(),
-          transform.getSideOutputTags().getAll(),
-          new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
-          windowingStrategy,
-          new HashMap<PCollectionView<?>, WindowingStrategy<?, ?>>(),
-          context.getPipelineOptions());
-
-      UnionCoder unionCoder = createUnionCoder(outputs.values());
 
-      CoderTypeInformation<RawUnionValue> unionTypeInformation =
-          new CoderTypeInformation<>(unionCoder);
+      if (allInputs.isEmpty()) {
 
-      DataStream<WindowedValue<InputT>> inputDataStream =
-          context.getInputDataStream(context.getInput(transform));
+        // create an empty dummy source to satisfy downstream operations
+        // we cannot create an empty source in Flink, therefore we have to
+        // add the flatMap that simply never forwards the single element
+        DataStreamSource<String> dummySource =
+            context.getExecutionEnvironment().fromElements("dummy");
 
-      SingleOutputStreamOperator<RawUnionValue> unionStream = inputDataStream
-          .transform(transform.getName(), unionTypeInformation, doFnOperator);
-
-      for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) {
-        final int outputTag = tagsToLabels.get(output.getKey());
-
-        TypeInformation outputTypeInfo =
-            context.getTypeInfo(output.getValue());
-
-        unionStream.flatMap(new FlatMapFunction<RawUnionValue, Object>() {
-          @Override
-          public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception {
-            if (value.getUnionTag() == outputTag) {
-              out.collect(value.getValue());
-            }
-          }
-        }).returns(outputTypeInfo);
-      }
-    }
-
-    private Map<TupleTag<?>, Integer> transformTupleTagsToLabels(
-        TupleTag<?> mainTag,
-        Set<TupleTag<?>> secondaryTags) {
+        DataStream<WindowedValue<T>> result = dummySource.flatMap(
+            new FlatMapFunction<String, WindowedValue<T>>() {
+              @Override
+              public void flatMap(
+                  String s,
+                  Collector<WindowedValue<T>> collector) throws Exception {
+                // never return anything
+              }
+            }).returns(
+            new CoderTypeInformation<>(
+                WindowedValue.getFullCoder(
+                    (Coder<T>) VoidCoder.of(),
+                    GlobalWindow.Coder.INSTANCE)));
+        context.setOutputDataStream(context.getOutput(transform), result);
 
-      Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap();
-      int count = 0;
-      tagToLabelMap.put(mainTag, count++);
-      for (TupleTag<?> tag : secondaryTags) {
-        if (!tagToLabelMap.containsKey(tag)) {
-          tagToLabelMap.put(tag, count++);
+      } else {
+        DataStream<T> result = null;
+        for (PCollection<T> collection : allInputs) {
+          DataStream<T> current = context.getInputDataStream(collection);
+          result = (result == null) ? current : result.union(current);
         }
+        context.setOutputDataStream(context.getOutput(transform), result);
       }
-      return tagToLabelMap;
-    }
-
-    private UnionCoder createUnionCoder(Collection<PCollection<?>> taggedCollections) {
-      List<Coder<?>> outputCoders = Lists.newArrayList();
-      for (PCollection<?> coll : taggedCollections) {
-        outputCoders.add(coll.getCoder());
-      }
-      return UnionCoder.of(outputCoders);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
index 71cc6b7..61abf9a 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
@@ -40,6 +40,10 @@ public class CoderTypeInformation<T> extends TypeInformation<T> implements Atomi
     this.coder = coder;
   }
 
+  public Coder<T> getCoder() {
+    return coder;
+  }
+
   @Override
   public boolean isBasicType() {
     return false;