You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/24 19:52:48 UTC

[10/17] incubator-beam git commit: [BEAM-102] Add Side Inputs in Flink Streaming Runner

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index e273132..092a226 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -17,9 +17,13 @@
  */
 package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
+import avro.shaded.com.google.common.base.Preconditions;
+
+import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
@@ -27,9 +31,13 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.DoFnRunner;
 import org.apache.beam.sdk.util.DoFnRunners;
 import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.NullSideInputReader;
+import org.apache.beam.sdk.util.PushbackSideInputDoFnRunner;
+import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -37,15 +45,36 @@ import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 
+import com.google.common.collect.Iterables;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -58,7 +87,8 @@ import java.util.Map;
  */
 public class DoFnOperator<InputT, FnOutputT, OutputT>
     extends AbstractStreamOperator<OutputT>
-    implements OneInputStreamOperator<WindowedValue<InputT>, OutputT> {
+    implements OneInputStreamOperator<WindowedValue<InputT>, OutputT>,
+      TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, OutputT> {
 
   protected OldDoFn<InputT, FnOutputT> doFn;
   protected final SerializedPipelineOptions serializedOptions;
@@ -66,7 +96,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
   protected final TupleTag<FnOutputT> mainOutputTag;
   protected final List<TupleTag<?>> sideOutputTags;
 
-  protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
+  protected final Collection<PCollectionView<?>> sideInputs;
+  protected final Map<Integer, PCollectionView<?>> sideInputTagMapping;
 
   protected final boolean hasSideInputs;
 
@@ -74,25 +105,36 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   protected final OutputManagerFactory<OutputT> outputManagerFactory;
 
-  protected transient DoFnRunner<InputT, FnOutputT> doFnRunner;
+  protected transient PushbackSideInputDoFnRunner<InputT, FnOutputT> pushbackDoFnRunner;
 
-  /**
-   * To keep track of the current watermark so that we can immediately fire if a trigger
-   * registers an event time callback for a timestamp that lies in the past.
-   */
-  protected transient long currentWatermark = Long.MIN_VALUE;
+  protected transient SideInputHandler sideInputHandler;
+
+  protected transient long currentInputWatermark;
+
+  protected transient long currentOutputWatermark;
+
+  private transient AbstractStateBackend sideInputStateBackend;
+
+  private final ReducingStateDescriptor<Long> pushedBackWatermarkDescriptor;
+
+  private final ListStateDescriptor<WindowedValue<InputT>> pushedBackDescriptor;
+
+  private transient Map<String, KvStateSnapshot<?, ?, ?, ?, ?>> restoredSideInputState;
 
   public DoFnOperator(
       OldDoFn<InputT, FnOutputT> doFn,
+      TypeInformation<WindowedValue<InputT>> inputType,
       TupleTag<FnOutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
       OutputManagerFactory<OutputT> outputManagerFactory,
       WindowingStrategy<?, ?> windowingStrategy,
-      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+      Map<Integer, PCollectionView<?>> sideInputTagMapping,
+      Collection<PCollectionView<?>> sideInputs,
       PipelineOptions options) {
     this.doFn = doFn;
     this.mainOutputTag = mainOutputTag;
     this.sideOutputTags = sideOutputTags;
+    this.sideInputTagMapping = sideInputTagMapping;
     this.sideInputs = sideInputs;
     this.serializedOptions = new SerializedPipelineOptions(options);
     this.windowingStrategy = windowingStrategy;
@@ -100,6 +142,15 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
     this.hasSideInputs = !sideInputs.isEmpty();
 
+    this.pushedBackWatermarkDescriptor =
+        new ReducingStateDescriptor<>(
+            "pushed-back-elements-watermark-hold",
+            new LongMinReducer(),
+            LongSerializer.INSTANCE);
+
+    this.pushedBackDescriptor =
+        new ListStateDescriptor<>("pushed-back-values", inputType);
+
     setChainingStrategy(ChainingStrategy.ALWAYS);
   }
 
@@ -119,7 +170,10 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
     this.doFn = getDoFn();
 
-    Aggregator.AggregatorFactory aggregatorFactory = new Aggregator.AggregatorFactory() {
+    currentInputWatermark = Long.MIN_VALUE;
+    currentOutputWatermark = currentInputWatermark;
+
+   	Aggregator.AggregatorFactory aggregatorFactory = new Aggregator.AggregatorFactory() {
       @Override
       public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
           Class<?> fnClass,
@@ -134,10 +188,42 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       }
     };
 
-    doFnRunner = DoFnRunners.createDefault(
+    SideInputReader sideInputReader = NullSideInputReader.of(sideInputs);
+    if (!sideInputs.isEmpty()) {
+      String operatorIdentifier =
+          this.getClass().getSimpleName() + "_"
+              + getRuntimeContext().getIndexOfThisSubtask() + "_sideInput";
+
+      sideInputStateBackend = this
+          .getContainingTask()
+          .createStateBackend(operatorIdentifier,
+              new GenericTypeInfo<>(ByteBuffer.class).createSerializer(new ExecutionConfig()));
+
+      Preconditions.checkState(
+          sideInputStateBackend != null,
+          "Side input state backend cannot be bull");
+
+      if (restoredSideInputState != null) {
+        @SuppressWarnings("unchecked,rawtypes")
+        HashMap<String, KvStateSnapshot> castRestored = (HashMap) restoredSideInputState;
+        sideInputStateBackend.injectKeyValueStateSnapshots(castRestored, 0L);
+        restoredSideInputState = null;
+      }
+
+      sideInputStateBackend.setCurrentKey(
+          ByteBuffer.wrap(CoderUtils.encodeToByteArray(VoidCoder.of(), null)));
+
+      StateInternals<Void> sideInputStateInternals =
+          new FlinkStateInternals<>(sideInputStateBackend, VoidCoder.of());
+
+      sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals);
+      sideInputReader = sideInputHandler;
+    }
+
+    DoFnRunner<InputT, FnOutputT> doFnRunner = DoFnRunners.createDefault(
         serializedOptions.getPipelineOptions(),
         doFn,
-        null,
+        sideInputReader,
         outputManagerFactory.create(output),
         mainOutputTag,
         sideOutputTags,
@@ -145,25 +231,177 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
         aggregatorFactory,
         windowingStrategy);
 
-    doFnRunner.startBundle();
+    pushbackDoFnRunner =
+        PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
+
     doFn.setup();
   }
 
   @Override
   public void close() throws Exception {
     super.close();
-    doFnRunner.finishBundle();
     doFn.teardown();
   }
 
+  protected final long getPushbackWatermarkHold() {
+    // if we don't have side inputs we never hold the watermark
+    if (sideInputs.isEmpty()) {
+      return Long.MAX_VALUE;
+    }
+
+    try {
+      Long result = sideInputStateBackend.getPartitionedState(
+          null,
+          VoidSerializer.INSTANCE,
+          pushedBackWatermarkDescriptor).get();
+      return result != null ? result : Long.MAX_VALUE;
+    } catch (Exception e) {
+      throw new RuntimeException("Error retrieving pushed back watermark state.", e);
+    }
+  }
+
   @Override
-  public final void processElement(StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
-    doFnRunner.processElement(streamRecord.getValue());
+  public final void processElement(
+      StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
+    pushbackDoFnRunner.startBundle();
+    pushbackDoFnRunner.processElement(streamRecord.getValue());
+    pushbackDoFnRunner.finishBundle();
+  }
+
+  @Override
+  public final void processElement1(
+      StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
+    pushbackDoFnRunner.startBundle();
+    Iterable<WindowedValue<InputT>> justPushedBack =
+        pushbackDoFnRunner.processElementInReadyWindows(streamRecord.getValue());
+
+    ListState<WindowedValue<InputT>> pushedBack =
+        sideInputStateBackend.getPartitionedState(
+            null,
+            VoidSerializer.INSTANCE,
+            pushedBackDescriptor);
+
+    ReducingState<Long> pushedBackWatermark =
+        sideInputStateBackend.getPartitionedState(
+            null,
+            VoidSerializer.INSTANCE,
+            pushedBackWatermarkDescriptor);
+
+    for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
+      pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis());
+      pushedBack.add(pushedBackValue);
+    }
+    pushbackDoFnRunner.finishBundle();
+  }
+
+  @Override
+  public final void processElement2(
+      StreamRecord<RawUnionValue> streamRecord) throws Exception {
+    pushbackDoFnRunner.startBundle();
+
+    @SuppressWarnings("unchecked")
+    WindowedValue<Iterable<?>> value =
+        (WindowedValue<Iterable<?>>) streamRecord.getValue().getValue();
+
+    PCollectionView<?> sideInput = sideInputTagMapping.get(streamRecord.getValue().getUnionTag());
+    sideInputHandler.addSideInputValue(sideInput, value);
+
+    ListState<WindowedValue<InputT>> pushedBack =
+        sideInputStateBackend.getPartitionedState(
+            null,
+            VoidSerializer.INSTANCE,
+            pushedBackDescriptor);
+
+    List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
+    for (WindowedValue<InputT> elem: pushedBack.get()) {
+
+      // we need to set the correct key in case the operator is
+      // a (keyed) window operator
+      setKeyContextElement1(new StreamRecord<>(elem));
+
+      Iterable<WindowedValue<InputT>> justPushedBack =
+          pushbackDoFnRunner.processElementInReadyWindows(elem);
+      Iterables.addAll(newPushedBack, justPushedBack);
+    }
+
+
+    ReducingState<Long> pushedBackWatermark =
+        sideInputStateBackend.getPartitionedState(
+            null,
+            VoidSerializer.INSTANCE,
+            pushedBackWatermarkDescriptor);
+
+    pushedBack.clear();
+    pushedBackWatermark.clear();
+    for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
+      pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis());
+      pushedBack.add(pushedBackValue);
+    }
+
+    pushbackDoFnRunner.finishBundle();
+
+    // maybe output a new watermark
+    processWatermark1(new Watermark(currentInputWatermark));
   }
 
   @Override
   public void processWatermark(Watermark mark) throws Exception {
-    output.emitWatermark(mark);
+    processWatermark1(mark);
+  }
+
+  @Override
+  public void processWatermark1(Watermark mark) throws Exception {
+    this.currentInputWatermark = mark.getTimestamp();
+    long potentialOutputWatermark =
+        Math.min(getPushbackWatermarkHold(), currentInputWatermark);
+    if (potentialOutputWatermark > currentOutputWatermark) {
+      currentOutputWatermark = potentialOutputWatermark;
+      output.emitWatermark(new Watermark(currentOutputWatermark));
+    }
+  }
+
+  @Override
+  public void processWatermark2(Watermark mark) throws Exception {
+    // ignore watermarks from the side-input input
+  }
+
+  @Override
+  public StreamTaskState snapshotOperatorState(
+      long checkpointId,
+      long timestamp) throws Exception {
+
+    StreamTaskState streamTaskState = super.snapshotOperatorState(checkpointId, timestamp);
+
+    if (sideInputStateBackend != null) {
+      // we have to manually checkpoint the side-input state backend and store
+      // the handle in the "user state" of the task state
+      HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> sideInputSnapshot =
+          sideInputStateBackend.snapshotPartitionedState(checkpointId, timestamp);
+
+      if (sideInputSnapshot != null) {
+        @SuppressWarnings("unchecked,rawtypes")
+        StateHandle<Serializable> sideInputStateHandle =
+            (StateHandle) sideInputStateBackend.checkpointStateSerializable(
+                sideInputSnapshot, checkpointId, timestamp);
+
+        streamTaskState.setFunctionState(sideInputStateHandle);
+      }
+    }
+
+    return streamTaskState;
+  }
+
+  @Override
+  public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
+    super.restoreState(state, recoveryTimestamp);
+
+    @SuppressWarnings("unchecked,rawtypes")
+    StateHandle<HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>>> sideInputStateHandle =
+        (StateHandle) state.getFunctionState();
+
+    if (sideInputStateHandle != null) {
+      restoredSideInputState = sideInputStateHandle.getState(getUserCodeClassloader());
+    }
   }
 
   /**
@@ -223,6 +461,16 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
   }
 
   /**
+   * For determining the pushback watermark in a {@link ReducingStateDescriptor}.
+   */
+  private static class LongMinReducer implements ReduceFunction<Long> {
+    @Override
+    public Long reduce(Long a, Long b) throws Exception {
+      return Math.min(a, b);
+    }
+  }
+
+  /**
    * {@link StepContext} for running {@link DoFn DoFns} on Flink. This does not allow
    * accessing state or timer internals.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 0a279cc..73c1eed 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -52,6 +53,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
@@ -71,12 +73,6 @@ import javax.annotation.Nullable;
 public class WindowDoFnOperator<K, InputT, OutputT>
     extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, WindowedValue<KV<K, OutputT>>> {
 
-  /**
-   * To keep track of the current watermark so that we can immediately fire if a trigger
-   * registers an event time callback for a timestamp that lies in the past.
-   */
-  private transient long currentWatermark = Long.MIN_VALUE;
-
   private final Coder<K> keyCoder;
   private final TimerInternals.TimerDataCoder timerCoder;
 
@@ -89,19 +85,23 @@ public class WindowDoFnOperator<K, InputT, OutputT>
 
   public WindowDoFnOperator(
       SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn,
+      TypeInformation<WindowedValue<KeyedWorkItem<K, InputT>>> inputType,
       TupleTag<KV<K, OutputT>> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
       OutputManagerFactory<WindowedValue<KV<K, OutputT>>> outputManagerFactory,
       WindowingStrategy<?, ?> windowingStrategy,
-      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+      Map<Integer, PCollectionView<?>> sideInputTagMapping,
+      Collection<PCollectionView<?>> sideInputs,
       PipelineOptions options,
       Coder<K> keyCoder) {
     super(
         null,
+        inputType,
         mainOutputTag,
         sideOutputTags,
         outputManagerFactory,
         windowingStrategy,
+        sideInputTagMapping,
         sideInputs,
         options);
 
@@ -184,21 +184,33 @@ public class WindowDoFnOperator<K, InputT, OutputT>
 
   @Override
   public void processWatermark(Watermark mark) throws Exception {
-    this.currentWatermark = mark.getTimestamp();
+    processWatermark1(mark);
+  }
+
+  @Override
+  public void processWatermark1(Watermark mark) throws Exception {
+    pushbackDoFnRunner.startBundle();
+
+    this.currentInputWatermark = mark.getTimestamp();
+
+    // hold back by the pushed back values waiting for side inputs
+    long actualInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp());
 
     boolean fire;
 
     do {
       Tuple2<ByteBuffer, TimerInternals.TimerData> timer = watermarkTimersQueue.peek();
-      if (timer != null && timer.f1.getTimestamp().getMillis() <= mark.getTimestamp()) {
+      if (timer != null && timer.f1.getTimestamp().getMillis() < actualInputWatermark) {
         fire = true;
 
+        System.out.println("FIRING: " + timer);
+
         watermarkTimersQueue.remove();
         watermarkTimers.remove(timer);
 
         setKeyContext(timer.f0);
 
-        doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
+        pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow(
                 KeyedWorkItems.<K, InputT>timersWorkItem(
                     stateInternals.getKey(),
                     Collections.singletonList(timer.f1))));
@@ -210,9 +222,16 @@ public class WindowDoFnOperator<K, InputT, OutputT>
 
     Instant watermarkHold = stateInternals.watermarkHold();
 
-    long outputWatermark = Math.min(currentWatermark, watermarkHold.getMillis());
+    long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold());
+
+    long potentialOutputWatermark = Math.min(currentInputWatermark, combinedWatermarkHold);
+
+    if (potentialOutputWatermark > currentOutputWatermark) {
+      currentOutputWatermark = potentialOutputWatermark;
+      output.emitWatermark(new Watermark(currentOutputWatermark));
+    }
+    pushbackDoFnRunner.finishBundle();
 
-    output.emitWatermark(new Watermark(outputWatermark));
   }
 
   @Override
@@ -311,13 +330,13 @@ public class WindowDoFnOperator<K, InputT, OutputT>
 
         @Override
         public Instant currentInputWatermarkTime() {
-          return new Instant(currentWatermark);
+          return new Instant(Math.min(currentInputWatermark, getPushbackWatermarkHold()));
         }
 
         @Nullable
         @Override
         public Instant currentOutputWatermarkTime() {
-          return new Instant(currentWatermark);
+          return new Instant(currentOutputWatermark);
         }
       };
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
new file mode 100644
index 0000000..9d983b0
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
+
+import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Wrapper for executing {@link BoundedSource UnboundedSources} as a Flink Source.
+ */
+public class BoundedSourceWrapper<OutputT>
+    extends RichParallelSourceFunction<WindowedValue<OutputT>> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BoundedSourceWrapper.class);
+
+  /**
+   * Keep the options so that we can initialize the readers.
+   */
+  private final SerializedPipelineOptions serializedOptions;
+
+  /**
+   * The split sources. We split them in the constructor to ensure that all parallel
+   * sources are consistent about the split sources.
+   */
+  private List<? extends BoundedSource<OutputT>> splitSources;
+
+  /**
+   * Make it a field so that we can access it in {@link #close()}.
+   */
+  private transient List<BoundedSource.BoundedReader<OutputT>> readers;
+
+  /**
+   * Initialize here and not in run() to prevent races where we cancel a job before run() is
+   * ever called or run() is called after cancel().
+   */
+  private volatile boolean isRunning = true;
+
+  @SuppressWarnings("unchecked")
+  public BoundedSourceWrapper(
+      PipelineOptions pipelineOptions,
+      BoundedSource<OutputT> source,
+      int parallelism) throws Exception {
+    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+
+    long desiredBundleSize = source.getEstimatedSizeBytes(pipelineOptions) / parallelism;
+
+    // get the splits early. we assume that the generated splits are stable,
+    // this is necessary so that the mapping of state to source is correct
+    // when restoring
+    splitSources = source.splitIntoBundles(desiredBundleSize, pipelineOptions);
+  }
+
+  @Override
+  public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception {
+    if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
+      throw new RuntimeException(
+          "Cannot emit watermarks, this hints at a misconfiguration/bug.");
+    }
+
+    // figure out which split sources we're responsible for
+    int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+    int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+
+    List<BoundedSource<OutputT>> localSources = new ArrayList<>();
+
+    for (int i = 0; i < splitSources.size(); i++) {
+      if (i % numSubtasks == subtaskIndex) {
+        localSources.add(splitSources.get(i));
+      }
+    }
+
+    LOG.info("Bounded Flink Source {}/{} is reading from sources: {}",
+        subtaskIndex,
+        numSubtasks,
+        localSources);
+
+    readers = new ArrayList<>();
+    // initialize readers from scratch
+    for (BoundedSource<OutputT> source : localSources) {
+      readers.add(source.createReader(serializedOptions.getPipelineOptions()));
+    }
+
+   if (readers.size() == 1) {
+      // the easy case, we just read from one reader
+      BoundedSource.BoundedReader<OutputT> reader = readers.get(0);
+
+      boolean dataAvailable = reader.start();
+      if (dataAvailable) {
+        emitElement(ctx, reader);
+      }
+
+      while (isRunning) {
+        dataAvailable = reader.advance();
+
+        if (dataAvailable)  {
+          emitElement(ctx, reader);
+        } else {
+          break;
+        }
+      }
+    } else {
+      // a bit more complicated, we are responsible for several readers
+      // loop through them and sleep if none of them had any data
+
+      int currentReader = 0;
+
+      // start each reader and emit data if immediately available
+      for (BoundedSource.BoundedReader<OutputT> reader : readers) {
+        boolean dataAvailable = reader.start();
+        if (dataAvailable) {
+          emitElement(ctx, reader);
+        }
+      }
+
+      // a flag telling us whether any of the readers had data
+      // if no reader had data, sleep for bit
+      boolean hadData = false;
+      while (isRunning && !readers.isEmpty()) {
+        BoundedSource.BoundedReader<OutputT> reader = readers.get(currentReader);
+        boolean dataAvailable = reader.advance();
+
+        if (dataAvailable) {
+          emitElement(ctx, reader);
+          hadData = true;
+        } else {
+          readers.remove(currentReader);
+          currentReader--;
+          if (readers.isEmpty()) {
+            break;
+          }
+        }
+
+        currentReader = (currentReader + 1) % readers.size();
+        if (currentReader == 0 && !hadData) {
+          Thread.sleep(50);
+        } else if (currentReader == 0) {
+          hadData = false;
+        }
+      }
+
+    }
+
+    // emit final Long.MAX_VALUE watermark, just to be sure
+    ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
+  }
+
+  /**
+   * Emit the current element from the given Reader. The reader is guaranteed to have data.
+   */
+  private void emitElement(
+      SourceContext<WindowedValue<OutputT>> ctx,
+      BoundedSource.BoundedReader<OutputT> reader) {
+    // make sure that reader state update and element emission are atomic
+    // with respect to snapshots
+    synchronized (ctx.getCheckpointLock()) {
+
+      OutputT item = reader.getCurrent();
+      Instant timestamp = reader.getCurrentTimestamp();
+
+      WindowedValue<OutputT> windowedValue =
+          WindowedValue.of(item, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+      ctx.collectWithTimestamp(windowedValue, timestamp.getMillis());
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    super.close();
+    if (readers != null) {
+      for (BoundedSource.BoundedReader<OutputT> reader: readers) {
+        reader.close();
+      }
+    }
+  }
+
+  @Override
+  public void cancel() {
+    isRunning = false;
+  }
+
+  /**
+   * Visible so that we can check this in tests. Must not be used for anything else.
+   */
+  @VisibleForTesting
+  public List<? extends BoundedSource<OutputT>> getSplitSources() {
+    return splitSources;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
deleted file mode 100644
index 0d72f65..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-import java.io.ByteArrayInputStream;
-import java.util.List;
-
-/**
- * This flat map function bootstraps from collection elements and turns them into WindowedValues
- * (as required by the Flink runner).
- */
-public class FlinkStreamingCreateFunction<IN, OUT> implements FlatMapFunction<IN, WindowedValue<OUT>> {
-
-  private final List<byte[]> elements;
-  private final Coder<OUT> coder;
-
-  public FlinkStreamingCreateFunction(List<byte[]> elements, Coder<OUT> coder) {
-    this.elements = elements;
-    this.coder = coder;
-  }
-
-  @Override
-  public void flatMap(IN value, Collector<WindowedValue<OUT>> out) throws Exception {
-
-    for (byte[] element : elements) {
-      ByteArrayInputStream bai = new ByteArrayInputStream(element);
-      OUT outValue = coder.decode(bai, Coder.Context.OUTER);
-
-      out.collect(WindowedValue.of(outValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
new file mode 100644
index 0000000..fb1b1e8
--- /dev/null
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PCollectionViewTesting;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.flink.shaded.com.google.common.base.Function;
+import org.apache.flink.shaded.com.google.common.base.Predicate;
+import org.apache.flink.shaded.com.google.common.collect.FluentIterable;
+import org.apache.flink.shaded.com.google.common.collect.ImmutableMap;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Collections;
+import java.util.HashMap;
+import javax.annotation.Nullable;
+
+/**
+ * Tests for {@link DoFnOperator}.
+ */
+@RunWith(JUnit4.class)
+public class DoFnOperatorTest {
+
+  // views and windows for testing side inputs
+  private static final long WINDOW_MSECS_1 = 100;
+  private static final long WINDOW_MSECS_2 = 500;
+
+  private WindowingStrategy<Object, IntervalWindow> windowingStrategy1 =
+      WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_1)));
+
+  private PCollectionView<Iterable<String>> view1 = PCollectionViewTesting.testingView(
+      new TupleTag<Iterable<WindowedValue<String>>>() {},
+      new PCollectionViewTesting.IdentityViewFn<String>(),
+      StringUtf8Coder.of(),
+      windowingStrategy1);
+
+  private WindowingStrategy<Object, IntervalWindow> windowingStrategy2 =
+      WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_2)));
+
+  private PCollectionView<Iterable<String>> view2 = PCollectionViewTesting.testingView(
+      new TupleTag<Iterable<WindowedValue<String>>>() {},
+      new PCollectionViewTesting.IdentityViewFn<String>(),
+      StringUtf8Coder.of(),
+      windowingStrategy2);
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testSingleOutput() throws Exception {
+
+    WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
+        WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
+
+    CoderTypeInformation<WindowedValue<String>> coderTypeInfo =
+        new CoderTypeInformation<>(windowedValueCoder);
+
+    TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+    DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
+        new IdentityDoFn<String>(),
+        coderTypeInfo,
+        outputTag,
+        Collections.<TupleTag<?>>emptyList(),
+        new DoFnOperator.DefaultOutputManagerFactory(),
+        WindowingStrategy.globalDefault(),
+        new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+        Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+        PipelineOptionsFactory.as(FlinkPipelineOptions.class));
+
+    OneInputStreamOperatorTestHarness<WindowedValue<String>, String> testHarness =
+        new OneInputStreamOperatorTestHarness<>(doFnOperator);
+
+    testHarness.open();
+
+    testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("Hello")));
+
+    assertThat(
+        this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        contains(WindowedValue.valueInGlobalWindow("Hello")));
+
+    testHarness.close();
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testMultiOutputOutput() throws Exception {
+
+    WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
+        WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
+
+    CoderTypeInformation<WindowedValue<String>> coderTypeInfo =
+        new CoderTypeInformation<>(windowedValueCoder);
+
+    TupleTag<String> mainOutput = new TupleTag<>("main-output");
+    TupleTag<String> sideOutput1 = new TupleTag<>("side-output-1");
+    TupleTag<String> sideOutput2 = new TupleTag<>("side-output-2");
+    ImmutableMap<TupleTag<?>, Integer> outputMapping = ImmutableMap.<TupleTag<?>, Integer>builder()
+        .put(mainOutput, 1)
+        .put(sideOutput1, 2)
+        .put(sideOutput2, 3)
+        .build();
+
+    DoFnOperator<String, String, RawUnionValue> doFnOperator = new DoFnOperator<>(
+        new MultiOutputDoFn(sideOutput1, sideOutput2),
+        coderTypeInfo,
+        mainOutput,
+        ImmutableList.<TupleTag<?>>of(sideOutput1, sideOutput2),
+        new DoFnOperator.MultiOutputOutputManagerFactory(outputMapping),
+        WindowingStrategy.globalDefault(),
+        new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+        Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+        PipelineOptionsFactory.as(FlinkPipelineOptions.class));
+
+    OneInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue> testHarness =
+        new OneInputStreamOperatorTestHarness<>(doFnOperator);
+
+    testHarness.open();
+
+    testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("one")));
+    testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("two")));
+    testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("hello")));
+
+    assertThat(
+        this.stripStreamRecordFromRawUnion(testHarness.getOutput()),
+        contains(
+            new RawUnionValue(2, WindowedValue.valueInGlobalWindow("side: one")),
+            new RawUnionValue(3, WindowedValue.valueInGlobalWindow("side: two")),
+            new RawUnionValue(1, WindowedValue.valueInGlobalWindow("got: hello")),
+            new RawUnionValue(2, WindowedValue.valueInGlobalWindow("got: hello")),
+            new RawUnionValue(3, WindowedValue.valueInGlobalWindow("got: hello"))));
+
+    testHarness.close();
+  }
+
+  /**
+   * For now, this test doesn't work because {@link TwoInputStreamOperatorTestHarness} is not
+   * sufficiently well equipped to handle more complex operators that require a state backend.
+   * We have to revisit this once we update to a newer version of Flink and also add some more
+   * tests that verify pushback behaviour and watermark hold behaviour.
+   *
+   * <p>The behaviour that we would test here is also exercised by the
+   * {@link org.apache.beam.sdk.testing.RunnableOnService} tests, so the code is not untested.
+   */
+  @Test
+  @Ignore
+  @SuppressWarnings("unchecked")
+  public void testSideInputs() throws Exception {
+
+    WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
+        WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
+
+    CoderTypeInformation<WindowedValue<String>> coderTypeInfo =
+        new CoderTypeInformation<>(windowedValueCoder);
+
+    TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+    ImmutableMap<Integer, PCollectionView<?>> sideInputMapping =
+        ImmutableMap.<Integer, PCollectionView<?>>builder()
+            .put(1, view1)
+            .put(2, view2)
+            .build();
+
+    DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
+        new IdentityDoFn<String>(),
+        coderTypeInfo,
+        outputTag,
+        Collections.<TupleTag<?>>emptyList(),
+        new DoFnOperator.DefaultOutputManagerFactory(),
+        WindowingStrategy.globalDefault(),
+        sideInputMapping, /* side-input mapping */
+        ImmutableList.<PCollectionView<?>>of(view1, view2), /* side inputs */
+        PipelineOptionsFactory.as(FlinkPipelineOptions.class));
+
+    TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue, String> testHarness =
+        new TwoInputStreamOperatorTestHarness<>(doFnOperator);
+
+    testHarness.open();
+
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(100));
+
+    // push in some side-input elements
+    testHarness.processElement2(
+        new StreamRecord<>(
+            new RawUnionValue(
+                1,
+                valuesInWindow(ImmutableList.of("hello", "ciao"), new Instant(0), firstWindow))));
+
+    testHarness.processElement2(
+        new StreamRecord<>(
+            new RawUnionValue(
+                2,
+                valuesInWindow(ImmutableList.of("foo", "bar"), new Instant(0), firstWindow))));
+
+    // push in a regular elements
+    testHarness.processElement1(new StreamRecord<>(WindowedValue.valueInGlobalWindow("Hello")));
+
+    assertThat(
+        this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        contains(WindowedValue.valueInGlobalWindow("Hello")));
+
+    testHarness.close();
+  }
+
+  private <T> Iterable<WindowedValue<T>> stripStreamRecordFromWindowedValue(
+      Iterable<Object> input) {
+
+    return FluentIterable.from(input).filter(new Predicate<Object>() {
+      @Override
+      public boolean apply(@Nullable Object o) {
+        return o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof WindowedValue;
+      }
+    }).transform(new Function<Object, WindowedValue<T>>() {
+      @Nullable
+      @Override
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      public WindowedValue<T> apply(@Nullable Object o) {
+        if (o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof WindowedValue) {
+          return (WindowedValue) ((StreamRecord) o).getValue();
+        }
+        throw new RuntimeException("unreachable");
+      }
+    });
+  }
+
+  private Iterable<RawUnionValue> stripStreamRecordFromRawUnion(Iterable<Object> input) {
+    return FluentIterable.from(input).filter(new Predicate<Object>() {
+      @Override
+      public boolean apply(@Nullable Object o) {
+        return o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof RawUnionValue;
+      }
+    }).transform(new Function<Object, RawUnionValue>() {
+      @Nullable
+      @Override
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      public RawUnionValue apply(@Nullable Object o) {
+        if (o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof RawUnionValue) {
+          return (RawUnionValue) ((StreamRecord) o).getValue();
+        }
+        throw new RuntimeException("unreachable");
+      }
+    });
+  }
+
+  private static class MultiOutputDoFn extends OldDoFn<String, String> {
+    private TupleTag<String> sideOutput1;
+    private TupleTag<String> sideOutput2;
+
+    public MultiOutputDoFn(TupleTag<String> sideOutput1, TupleTag<String> sideOutput2) {
+      this.sideOutput1 = sideOutput1;
+      this.sideOutput2 = sideOutput2;
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      if (c.element().equals("one")) {
+        c.sideOutput(sideOutput1, "side: one");
+      } else if (c.element().equals("two")) {
+        c.sideOutput(sideOutput2, "side: two");
+      } else {
+        c.output("got: " + c.element());
+        c.sideOutput(sideOutput1, "got: " + c.element());
+        c.sideOutput(sideOutput2, "got: " + c.element());
+      }
+    }
+  }
+
+  private static class IdentityDoFn<T> extends OldDoFn<T, T> {
+    @Override
+    public void processElement(OldDoFn<T, T>.ProcessContext c) throws Exception {
+      c.output(c.element());
+    }
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  private WindowedValue<Iterable<?>> valuesInWindow(
+      Iterable<?> values, Instant timestamp, BoundedWindow window) {
+    return (WindowedValue) WindowedValue.of(values, timestamp, window, PaneInfo.NO_FIRING);
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  private <T> WindowedValue<T> valueInWindow(
+      T value, Instant timestamp, BoundedWindow window) {
+    return WindowedValue.of(value, timestamp, window, PaneInfo.NO_FIRING);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index f5a52f5..1122179 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -91,7 +91,7 @@ public class UnboundedSourceWrapperTest {
 
             @Override
             public void collect(
-                StreamRecord<WindowedValue<KV<Integer,Integer>>> windowedValueStreamRecord) {
+                StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
 
               count++;
               if (count >= NUM_ELEMENTS) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
index 159b100..07bfe69 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
@@ -48,4 +48,29 @@ public class RawUnionValue {
   public String toString() {
     return unionTag + ":" + value;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    RawUnionValue that = (RawUnionValue) o;
+
+    if (unionTag != that.unionTag) {
+      return false;
+    }
+    return value != null ? value.equals(that.value) : that.value == null;
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = unionTag;
+    result = 31 * result + (value != null ? value.hashCode() : 0);
+    return result;
+  }
 }