You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/16 03:47:39 UTC

[1/2] beam git commit: [BEAM-1612] Support real Bundle in Flink runner

Repository: beam
Updated Branches:
  refs/heads/master 3a8b0b68c -> 724eda37e


[BEAM-1612] Support real Bundle in Flink runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ceec7ce5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ceec7ce5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ceec7ce5

Branch: refs/heads/master
Commit: ceec7ce5ba287ab40ee1f7c87129b72d4db1c1c7
Parents: 3a8b0b6
Author: JingsongLi <lz...@aliyun.com>
Authored: Thu Jun 15 17:48:59 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Wed Aug 16 11:44:49 2017 +0800

----------------------------------------------------------------------
 .../runners/flink/FlinkPipelineOptions.java     |  11 +
 .../FlinkStreamingTransformTranslators.java     |  77 ++--
 .../wrappers/streaming/DoFnOperator.java        | 412 ++++++++++++++-----
 .../streaming/SplittableDoFnOperator.java       |   4 +-
 .../wrappers/streaming/WindowDoFnOperator.java  |   4 +-
 .../state/FlinkSplitStateInternals.java         |   8 +-
 .../beam/runners/flink/PipelineOptionsTest.java |  21 +-
 .../flink/streaming/DoFnOperatorTest.java       | 161 ++++++--
 8 files changed, 535 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index c255672..2432394 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -127,4 +127,15 @@ public interface FlinkPipelineOptions
   @Default.Boolean(false)
   Boolean getRetainExternalizedCheckpointsOnCancellation();
   void setRetainExternalizedCheckpointsOnCancellation(Boolean retainOnCancellation);
+
+  @Description("The maximum number of elements in a bundle.")
+  @Default.Long(1000)
+  Long getMaxBundleSize();
+  void setMaxBundleSize(Long size);
+
+  @Description("The maximum time to wait before finalising a bundle (in milliseconds).")
+  @Default.Long(1000)
+  Long getMaxBundleTimeMills();
+  void setMaxBundleTimeMills(Long time);
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 3d7e81f..058e195 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -339,7 +339,9 @@ class FlinkStreamingTransformTranslators {
           List<TupleTag<?>> additionalOutputTags,
           FlinkStreamingTranslationContext context,
           WindowingStrategy<?, ?> windowingStrategy,
-          Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToLabels,
+          Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags,
+          Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders,
+          Map<TupleTag<?>, Integer> tagsToIds,
           Coder<WindowedValue<InputT>> inputCoder,
           Coder keyCoder,
           Map<Integer, PCollectionView<?>> transformedSideInputs);
@@ -360,15 +362,27 @@ class FlinkStreamingTransformTranslators {
       WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
 
       Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags = Maps.newHashMap();
+      Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders = Maps.newHashMap();
+
+      // We associate output tags with ids, the Integer is easier to serialize than TupleTag.
+      // The return map of AppliedPTransform.getOutputs() is an ImmutableMap, its implementation is
+      // RegularImmutableMap, its entrySet order is the same with the order of insertion.
+      // So we can use the original AppliedPTransform.getOutputs() to produce deterministic ids.
+      Map<TupleTag<?>, Integer> tagsToIds = Maps.newHashMap();
+      int idCount = 0;
+      tagsToIds.put(mainOutputTag, idCount++);
       for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
         if (!tagsToOutputTags.containsKey(entry.getKey())) {
           tagsToOutputTags.put(
               entry.getKey(),
-              new OutputTag<WindowedValue<?>>(
+              new OutputTag<>(
                   entry.getKey().getId(),
                   (TypeInformation) context.getTypeInfo((PCollection<?>) entry.getValue())
               )
           );
+          tagsToCoders.put(entry.getKey(),
+              (Coder) context.getCoder((PCollection<OutputT>) entry.getValue()));
+          tagsToIds.put(entry.getKey(), idCount++);
         }
       }
 
@@ -409,6 +423,8 @@ class FlinkStreamingTransformTranslators {
                 context,
                 windowingStrategy,
                 tagsToOutputTags,
+                tagsToCoders,
+                tagsToIds,
                 inputCoder,
                 keyCoder,
                 new HashMap<Integer, PCollectionView<?>>() /* side-input mapping */);
@@ -430,6 +446,8 @@ class FlinkStreamingTransformTranslators {
                 context,
                 windowingStrategy,
                 tagsToOutputTags,
+                tagsToCoders,
+                tagsToIds,
                 inputCoder,
                 keyCoder,
                 transformedSideInputs.f0);
@@ -506,6 +524,8 @@ class FlinkStreamingTransformTranslators {
                 FlinkStreamingTranslationContext context,
                 WindowingStrategy<?, ?> windowingStrategy,
                 Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags,
+                Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders,
+                Map<TupleTag<?>, Integer> tagsToIds,
                 Coder<WindowedValue<InputT>> inputCoder,
                 Coder keyCoder,
                 Map<Integer, PCollectionView<?>> transformedSideInputs) {
@@ -515,7 +535,8 @@ class FlinkStreamingTransformTranslators {
                   inputCoder,
                   mainOutputTag,
                   additionalOutputTags,
-                  new DoFnOperator.MultiOutputOutputManagerFactory(mainOutputTag, tagsToOutputTags),
+                  new DoFnOperator.MultiOutputOutputManagerFactory<>(
+                      mainOutputTag, tagsToOutputTags, tagsToCoders, tagsToIds),
                   windowingStrategy,
                   transformedSideInputs,
                   sideInputs,
@@ -551,25 +572,28 @@ class FlinkStreamingTransformTranslators {
             @Override
             public DoFnOperator<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>
                 createDoFnOperator(
-                    DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> doFn,
-                    String stepName,
-                    List<PCollectionView<?>> sideInputs,
-                    TupleTag<OutputT> mainOutputTag,
-                    List<TupleTag<?>> additionalOutputTags,
-                    FlinkStreamingTranslationContext context,
-                    WindowingStrategy<?, ?> windowingStrategy,
-                    Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags,
-                    Coder<WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>>>
-                        inputCoder,
-                    Coder keyCoder,
-                    Map<Integer, PCollectionView<?>> transformedSideInputs) {
+                DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> doFn,
+                String stepName,
+                List<PCollectionView<?>> sideInputs,
+                TupleTag<OutputT> mainOutputTag,
+                List<TupleTag<?>> additionalOutputTags,
+                FlinkStreamingTranslationContext context,
+                WindowingStrategy<?, ?> windowingStrategy,
+                Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags,
+                Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders,
+                Map<TupleTag<?>, Integer> tagsToIds,
+                Coder<WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>>>
+                    inputCoder,
+                Coder keyCoder,
+                Map<Integer, PCollectionView<?>> transformedSideInputs) {
               return new SplittableDoFnOperator<>(
                   doFn,
                   stepName,
                   inputCoder,
                   mainOutputTag,
                   additionalOutputTags,
-                  new DoFnOperator.MultiOutputOutputManagerFactory(mainOutputTag, tagsToOutputTags),
+                  new DoFnOperator.MultiOutputOutputManagerFactory<>(
+                      mainOutputTag, tagsToOutputTags, tagsToCoders, tagsToIds),
                   windowingStrategy,
                   transformedSideInputs,
                   sideInputs,
@@ -693,20 +717,21 @@ class FlinkStreamingTransformTranslators {
       SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, BoundedWindow> reduceFn =
           SystemReduceFn.buffering(inputKvCoder.getValueCoder());
 
+      Coder<WindowedValue<KV<K, Iterable<InputT>>>> outputCoder =
+          context.getCoder(context.getOutput(transform));
       TypeInformation<WindowedValue<KV<K, Iterable<InputT>>>> outputTypeInfo =
           context.getTypeInfo(context.getOutput(transform));
 
-      DoFnOperator.DefaultOutputManagerFactory<KV<K, Iterable<InputT>>> outputManagerFactory =
-          new DoFnOperator.DefaultOutputManagerFactory<>();
+      TupleTag<KV<K, Iterable<InputT>>> mainTag = new TupleTag<>("main output");
 
       WindowDoFnOperator<K, InputT, Iterable<InputT>> doFnOperator =
           new WindowDoFnOperator<>(
               reduceFn,
               context.getCurrentTransform().getFullName(),
               (Coder) windowedWorkItemCoder,
-              new TupleTag<KV<K, Iterable<InputT>>>("main output"),
+              mainTag,
               Collections.<TupleTag<?>>emptyList(),
-              outputManagerFactory,
+              new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, outputCoder),
               windowingStrategy,
               new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
               Collections.<PCollectionView<?>>emptyList(), /* side inputs */
@@ -793,6 +818,8 @@ class FlinkStreamingTransformTranslators {
           AppliedCombineFn.withInputCoder(
               transform.getFn(), input.getPipeline().getCoderRegistry(), inputKvCoder));
 
+      Coder<WindowedValue<KV<K, OutputT>>> outputCoder =
+          context.getCoder(context.getOutput(transform));
       TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo =
           context.getTypeInfo(context.getOutput(transform));
 
@@ -800,14 +827,15 @@ class FlinkStreamingTransformTranslators {
 
       if (sideInputs.isEmpty()) {
 
+        TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output");
         WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
             new WindowDoFnOperator<>(
                 reduceFn,
                 context.getCurrentTransform().getFullName(),
                 (Coder) windowedWorkItemCoder,
-                new TupleTag<KV<K, OutputT>>("main output"),
+                mainTag,
                 Collections.<TupleTag<?>>emptyList(),
-                new DoFnOperator.DefaultOutputManagerFactory<KV<K, OutputT>>(),
+                new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, outputCoder),
                 windowingStrategy,
                 new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
                 Collections.<PCollectionView<?>>emptyList(), /* side inputs */
@@ -826,14 +854,15 @@ class FlinkStreamingTransformTranslators {
         Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformSideInputs =
             transformSideInputs(sideInputs, context);
 
+        TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output");
         WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
             new WindowDoFnOperator<>(
                 reduceFn,
                 context.getCurrentTransform().getFullName(),
                 (Coder) windowedWorkItemCoder,
-                new TupleTag<KV<K, OutputT>>("main output"),
+                mainTag,
                 Collections.<TupleTag<?>>emptyList(),
-                new DoFnOperator.DefaultOutputManagerFactory<KV<K, OutputT>>(),
+                new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, outputCoder),
                 windowingStrategy,
                 transformSideInputs.f0,
                 sideInputs,

http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 62de423..0bf860a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -21,14 +21,20 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
@@ -45,6 +51,7 @@ import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.StatefulDoFnRunner;
+import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
@@ -57,6 +64,8 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkS
 import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.TimeDomain;
@@ -66,6 +75,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -87,6 +97,7 @@ import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.util.OutputTag;
 import org.joda.time.Instant;
 
@@ -95,8 +106,6 @@ import org.joda.time.Instant;
  *
  * @param <InputT> the input type of the {@link DoFn}
  * @param <OutputT> the output type of the {@link DoFn}
- * @param <OutputT> the output type of the operator, this can be different from the fn output
- *                 type when we have additional tagged outputs
  */
 public class DoFnOperator<InputT, OutputT>
     extends AbstractStreamOperator<WindowedValue<OutputT>>
@@ -125,7 +134,7 @@ public class DoFnOperator<InputT, OutputT>
 
   protected transient SideInputReader sideInputReader;
 
-  protected transient DoFnRunners.OutputManager outputManager;
+  protected transient BufferedOutputManager<OutputT> outputManager;
 
   private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
 
@@ -137,7 +146,7 @@ public class DoFnOperator<InputT, OutputT>
 
   private transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
 
-  protected transient FlinkStateInternals<?> stateInternals;
+  protected transient FlinkStateInternals<?> keyedStateInternals;
 
   private final String stepName;
 
@@ -147,14 +156,24 @@ public class DoFnOperator<InputT, OutputT>
 
   private final TimerInternals.TimerDataCoder timerCoder;
 
+  private final long maxBundleSize;
+
+  private final long maxBundleTimeMills;
+
   protected transient HeapInternalTimerService<?, TimerInternals.TimerData> timerService;
 
   protected transient FlinkTimerInternals timerInternals;
 
-  private transient StateInternals pushbackStateInternals;
+  private transient StateInternals nonKeyedStateInternals;
 
   private transient Optional<Long> pushedBackWatermark;
 
+  // bundle control
+  private transient boolean bundleStarted = false;
+  private transient long elementCount;
+  private transient long lastFinishBundleTime;
+  private transient ScheduledFuture<?> checkFinishBundleTimer;
+
   public DoFnOperator(
       DoFn<InputT, OutputT> doFn,
       String stepName,
@@ -184,10 +203,11 @@ public class DoFnOperator<InputT, OutputT>
 
     this.timerCoder =
         TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
-  }
 
-  private org.apache.beam.runners.core.StepContext createStepContext() {
-    return new StepContext();
+    FlinkPipelineOptions flinkOptions = options.as(FlinkPipelineOptions.class);
+
+    this.maxBundleSize = flinkOptions.getMaxBundleSize();
+    this.maxBundleTimeMills = flinkOptions.getMaxBundleTimeMills();
   }
 
   // allow overriding this in WindowDoFnOperator because this one dynamically creates
@@ -204,8 +224,21 @@ public class DoFnOperator<InputT, OutputT>
     setCurrentSideInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
     setCurrentOutputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
 
+    FlinkPipelineOptions options =
+        serializedOptions.get().as(FlinkPipelineOptions.class);
     sideInputReader = NullSideInputReader.of(sideInputs);
 
+    // maybe init by initializeState
+    if (nonKeyedStateInternals == null) {
+      if (keyCoder != null) {
+        nonKeyedStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder,
+            getKeyedStateBackend());
+      } else {
+        nonKeyedStateInternals =
+            new FlinkSplitStateInternals<>(getOperatorStateBackend());
+      }
+    }
+
     if (!sideInputs.isEmpty()) {
 
       pushedBackTag = StateTags.bag("pushed-back-values", inputCoder);
@@ -217,26 +250,14 @@ public class DoFnOperator<InputT, OutputT>
       sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals);
       sideInputReader = sideInputHandler;
 
-      // maybe init by initializeState
-      if (pushbackStateInternals == null) {
-        if (keyCoder != null) {
-          pushbackStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder,
-              getKeyedStateBackend());
-        } else {
-          pushbackStateInternals =
-              new FlinkSplitStateInternals<Object>(getOperatorStateBackend());
-        }
-      }
-
       pushedBackWatermark = Optional.absent();
-
     }
 
-    outputManager = outputManagerFactory.create(output);
+    outputManager = outputManagerFactory.create(output, nonKeyedStateInternals);
 
     // StatefulPardo or WindowDoFn
     if (keyCoder != null) {
-      stateInternals = new FlinkStateInternals<>((KeyedStateBackend) getKeyedStateBackend(),
+      keyedStateInternals = new FlinkStateInternals<>((KeyedStateBackend) getKeyedStateBackend(),
           keyCoder);
 
       timerService = (HeapInternalTimerService<?, TimerInternals.TimerData>)
@@ -253,10 +274,10 @@ public class DoFnOperator<InputT, OutputT>
 
     doFnInvoker.invokeSetup();
 
-    org.apache.beam.runners.core.StepContext stepContext = createStepContext();
+    StepContext stepContext = new FlinkStepContext();
 
     doFnRunner = DoFnRunners.simpleRunner(
-        serializedOptions.get(),
+        options,
         doFn,
         sideInputReader,
         outputManager,
@@ -301,11 +322,24 @@ public class DoFnOperator<InputT, OutputT>
           stateCleaner);
     }
 
-    if ((serializedOptions.get().as(FlinkPipelineOptions.class))
-        .getEnableMetrics()) {
+    if (options.getEnableMetrics()) {
       doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext());
     }
 
+    elementCount = 0L;
+    lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime();
+
+    // Schedule timer to check timeout of finish bundle.
+    long bundleCheckPeriod = (maxBundleTimeMills + 1) / 2;
+    checkFinishBundleTimer = getProcessingTimeService().scheduleAtFixedRate(
+        new ProcessingTimeCallback() {
+          @Override
+          public void onProcessingTime(long timestamp) throws Exception {
+            checkInvokeFinishBundleByTime();
+          }
+        },
+        bundleCheckPeriod, bundleCheckPeriod);
+
     pushbackDoFnRunner =
         SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
   }
@@ -315,9 +349,9 @@ public class DoFnOperator<InputT, OutputT>
     super.close();
 
     // sanity check: these should have been flushed out by +Inf watermarks
-    if (pushbackStateInternals != null) {
+    if (!sideInputs.isEmpty() && nonKeyedStateInternals != null) {
       BagState<WindowedValue<InputT>> pushedBack =
-          pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+          nonKeyedStateInternals.state(StateNamespaces.global(), pushedBackTag);
 
       Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
       if (pushedBackContents != null) {
@@ -328,10 +362,11 @@ public class DoFnOperator<InputT, OutputT>
         }
       }
     }
+    checkFinishBundleTimer.cancel(true);
     doFnInvoker.invokeTeardown();
   }
 
-  protected final long getPushbackWatermarkHold() {
+  private long getPushbackWatermarkHold() {
     // if we don't have side inputs we never hold the watermark
     if (sideInputs.isEmpty()) {
       return Long.MAX_VALUE;
@@ -351,7 +386,7 @@ public class DoFnOperator<InputT, OutputT>
     if (!pushedBackWatermark.isPresent()) {
 
       BagState<WindowedValue<InputT>> pushedBack =
-          pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+          nonKeyedStateInternals.state(StateNamespaces.global(), pushedBackTag);
 
       long min = Long.MAX_VALUE;
       for (WindowedValue<InputT> value : pushedBack.read()) {
@@ -364,9 +399,9 @@ public class DoFnOperator<InputT, OutputT>
   @Override
   public final void processElement(
       StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
-    doFnRunner.startBundle();
+    checkInvokeStartBundle();
     doFnRunner.processElement(streamRecord.getValue());
-    doFnRunner.finishBundle();
+    checkInvokeFinishBundleByCount();
   }
 
   private void setPushedBackWatermark(long watermark) {
@@ -376,12 +411,12 @@ public class DoFnOperator<InputT, OutputT>
   @Override
   public final void processElement1(
       StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
-    pushbackDoFnRunner.startBundle();
+    checkInvokeStartBundle();
     Iterable<WindowedValue<InputT>> justPushedBack =
         pushbackDoFnRunner.processElementInReadyWindows(streamRecord.getValue());
 
     BagState<WindowedValue<InputT>> pushedBack =
-        pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+        nonKeyedStateInternals.state(StateNamespaces.global(), pushedBackTag);
 
     checkInitPushedBackWatermark();
 
@@ -391,13 +426,13 @@ public class DoFnOperator<InputT, OutputT>
       pushedBack.add(pushedBackValue);
     }
     setPushedBackWatermark(min);
-    pushbackDoFnRunner.finishBundle();
+    checkInvokeFinishBundleByCount();
   }
 
   @Override
   public final void processElement2(
       StreamRecord<RawUnionValue> streamRecord) throws Exception {
-    pushbackDoFnRunner.startBundle();
+    checkInvokeStartBundle();
 
     @SuppressWarnings("unchecked")
     WindowedValue<Iterable<?>> value =
@@ -407,7 +442,7 @@ public class DoFnOperator<InputT, OutputT>
     sideInputHandler.addSideInputValue(sideInput, value);
 
     BagState<WindowedValue<InputT>> pushedBack =
-        pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+        nonKeyedStateInternals.state(StateNamespaces.global(), pushedBackTag);
 
     List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
 
@@ -433,7 +468,7 @@ public class DoFnOperator<InputT, OutputT>
     }
     setPushedBackWatermark(min);
 
-    pushbackDoFnRunner.finishBundle();
+    checkInvokeFinishBundleByCount();
 
     // maybe output a new watermark
     processWatermark1(new Watermark(currentInputWatermark));
@@ -446,6 +481,9 @@ public class DoFnOperator<InputT, OutputT>
 
   @Override
   public void processWatermark1(Watermark mark) throws Exception {
+
+    checkInvokeStartBundle();
+
     // We do the check here because we are guaranteed to at least get the +Inf watermark on the
     // main input when the job finishes.
     if (currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
@@ -461,12 +499,9 @@ public class DoFnOperator<InputT, OutputT>
           Math.min(getPushbackWatermarkHold(), currentInputWatermark);
       if (potentialOutputWatermark > currentOutputWatermark) {
         setCurrentOutputWatermark(potentialOutputWatermark);
-        output.emitWatermark(new Watermark(currentOutputWatermark));
+        emitWatermark(currentOutputWatermark);
       }
     } else {
-      // fireTimers, so we need startBundle.
-      pushbackDoFnRunner.startBundle();
-
       setCurrentInputWatermark(mark.getTimestamp());
 
       // hold back by the pushed back values waiting for side inputs
@@ -474,7 +509,7 @@ public class DoFnOperator<InputT, OutputT>
 
       timerService.advanceWatermark(toFlinkRuntimeWatermark(pushedBackInputWatermark));
 
-      Instant watermarkHold = stateInternals.watermarkHold();
+      Instant watermarkHold = keyedStateInternals.watermarkHold();
 
       long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold());
 
@@ -482,14 +517,23 @@ public class DoFnOperator<InputT, OutputT>
 
       if (potentialOutputWatermark > currentOutputWatermark) {
         setCurrentOutputWatermark(potentialOutputWatermark);
-        output.emitWatermark(new Watermark(currentOutputWatermark));
+        emitWatermark(currentOutputWatermark);
       }
-      pushbackDoFnRunner.finishBundle();
     }
   }
 
+  private void emitWatermark(long watermark) {
+    // Must invoke finishBatch before emit the +Inf watermark otherwise there are some late events.
+    if (watermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+      invokeFinishBundle();
+    }
+    output.emitWatermark(new Watermark(watermark));
+  }
+
   @Override
   public void processWatermark2(Watermark mark) throws Exception {
+    checkInvokeStartBundle();
+
     setCurrentSideInputWatermark(mark.getTimestamp());
     if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
       // this means we will never see any more side input
@@ -498,6 +542,7 @@ public class DoFnOperator<InputT, OutputT>
       // maybe output a new watermark
       processWatermark1(new Watermark(currentInputWatermark));
     }
+
   }
 
   /**
@@ -516,10 +561,9 @@ public class DoFnOperator<InputT, OutputT>
    * any future side input, i.e. that there is no point in waiting.
    */
   private void emitAllPushedBackData() throws Exception {
-    pushbackDoFnRunner.startBundle();
 
     BagState<WindowedValue<InputT>> pushedBack =
-        pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+        nonKeyedStateInternals.state(StateNamespaces.global(), pushedBackTag);
 
     Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
     if (pushedBackContents != null) {
@@ -537,11 +581,65 @@ public class DoFnOperator<InputT, OutputT>
 
     setPushedBackWatermark(Long.MAX_VALUE);
 
-    pushbackDoFnRunner.finishBundle();
+  }
+
+  /**
+   * Check whether invoke startBundle, if it is, need to output elements that were
+   * buffered as part of finishing a bundle in snapshot() first.
+   *
+   * <p>In order to avoid having {@link DoFnRunner#processElement(WindowedValue)} or
+   * {@link DoFnRunner#onTimer(String, BoundedWindow, Instant, TimeDomain)} not between
+   * StartBundle and FinishBundle, this method needs to be called in each processElement
+   * and each processWatermark and onProcessingTime. Do not need to call in onEventTime,
+   * because it has been guaranteed in the processWatermark.
+   */
+  private void checkInvokeStartBundle() {
+    if (!bundleStarted) {
+      outputManager.flushBuffer();
+      pushbackDoFnRunner.startBundle();
+      bundleStarted = true;
+    }
+  }
+
+  /**
+   * Check whether invoke finishBundle by elements count. Called in processElement.
+   */
+  private void checkInvokeFinishBundleByCount() {
+    elementCount++;
+    if (elementCount >= maxBundleSize) {
+      invokeFinishBundle();
+    }
+  }
+
+  /**
+   * Check whether invoke finishBundle by timeout.
+   */
+  private void checkInvokeFinishBundleByTime() {
+    long now = getProcessingTimeService().getCurrentProcessingTime();
+    if (now - lastFinishBundleTime >= maxBundleTimeMills) {
+      invokeFinishBundle();
+    }
+  }
+
+  private void invokeFinishBundle() {
+    if (bundleStarted) {
+      pushbackDoFnRunner.finishBundle();
+      bundleStarted = false;
+      elementCount = 0L;
+      lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime();
+    }
   }
 
   @Override
   public void snapshotState(StateSnapshotContext context) throws Exception {
+
+    // Forced finish a bundle in checkpoint barrier otherwise may lose data.
+    // Careful, it use OperatorState or KeyGroupState to store outputs, So it
+    // must be called before their snapshot.
+    outputManager.openBuffer();
+    invokeFinishBundle();
+    outputManager.closeBuffer();
+
     // copy from AbstractStreamOperator
     if (getKeyedStateBackend() != null) {
       KeyedStateCheckpointOutputStream out;
@@ -587,8 +685,8 @@ public class DoFnOperator<InputT, OutputT>
 
   @Override
   public void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) throws Exception {
-    if (!sideInputs.isEmpty() && keyCoder != null) {
-      ((FlinkKeyGroupStateInternals) pushbackStateInternals).snapshotKeyGroupState(
+    if (keyCoder != null) {
+      ((FlinkKeyGroupStateInternals) nonKeyedStateInternals).snapshotKeyGroupState(
           keyGroupIndex, out);
     }
   }
@@ -626,23 +724,26 @@ public class DoFnOperator<InputT, OutputT>
 
   @Override
   public void restoreKeyGroupState(int keyGroupIndex, DataInputStream in) throws Exception {
-    if (!sideInputs.isEmpty() && keyCoder != null) {
-      if (pushbackStateInternals == null) {
-        pushbackStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder,
+    if (keyCoder != null) {
+      if (nonKeyedStateInternals == null) {
+        nonKeyedStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder,
             getKeyedStateBackend());
       }
-      ((FlinkKeyGroupStateInternals) pushbackStateInternals)
+      ((FlinkKeyGroupStateInternals) nonKeyedStateInternals)
           .restoreKeyGroupState(keyGroupIndex, in, getUserCodeClassloader());
     }
   }
 
   @Override
   public void onEventTime(InternalTimer<Object, TimerData> timer) throws Exception {
+    // We don't have to cal checkInvokeStartBundle() because it's already called in
+    // processWatermark*().
     fireTimer(timer);
   }
 
   @Override
   public void onProcessingTime(InternalTimer<Object, TimerData> timer) throws Exception {
+    checkInvokeStartBundle();
     fireTimer(timer);
   }
 
@@ -670,71 +771,186 @@ public class DoFnOperator<InputT, OutputT>
   }
 
   /**
-   * Factory for creating an {@link DoFnRunners.OutputManager} from
+   * Factory for creating an {@link BufferedOutputManager} from
    * a Flink {@link Output}.
    */
   interface OutputManagerFactory<OutputT> extends Serializable {
-    DoFnRunners.OutputManager create(Output<StreamRecord<WindowedValue<OutputT>>> output);
+    BufferedOutputManager<OutputT> create(
+        Output<StreamRecord<WindowedValue<OutputT>>> output,
+        StateInternals stateInternals);
   }
 
   /**
-   * Default implementation of {@link OutputManagerFactory} that creates an
-   * {@link DoFnRunners.OutputManager} that only writes to
-   * a single logical output.
+   * A {@link DoFnRunners.OutputManager} that can buffer its outputs.
+   * Use {@link FlinkSplitStateInternals} or {@link FlinkKeyGroupStateInternals}
+   * to keep buffer data.
    */
-  public static class DefaultOutputManagerFactory<OutputT>
-      implements OutputManagerFactory<OutputT> {
+  public static class BufferedOutputManager<OutputT> implements
+      DoFnRunners.OutputManager {
+
+    private TupleTag<OutputT> mainTag;
+    private Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags;
+    private Map<TupleTag<?>, Integer> tagsToIds;
+    private Map<Integer, TupleTag<?>> idsToTags;
+    protected Output<StreamRecord<WindowedValue<OutputT>>> output;
+
+    private boolean openBuffer = false;
+    private BagState<KV<Integer, WindowedValue<?>>> bufferState;
+
+    BufferedOutputManager(
+        Output<StreamRecord<WindowedValue<OutputT>>> output,
+        TupleTag<OutputT> mainTag,
+        Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags,
+        final Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders,
+        Map<TupleTag<?>, Integer> tagsToIds,
+        StateInternals stateInternals) {
+      this.output = output;
+      this.mainTag = mainTag;
+      this.tagsToOutputTags = tagsToOutputTags;
+      this.tagsToIds = tagsToIds;
+      this.idsToTags = new HashMap<>();
+      for (Map.Entry<TupleTag<?>, Integer> entry : tagsToIds.entrySet()) {
+        idsToTags.put(entry.getValue(), entry.getKey());
+      }
+
+      ImmutableMap.Builder<Integer, Coder<WindowedValue<?>>> idsToCodersBuilder =
+          ImmutableMap.builder();
+      for (Map.Entry<TupleTag<?>, Integer> entry : tagsToIds.entrySet()) {
+        idsToCodersBuilder.put(entry.getValue(), tagsToCoders.get(entry.getKey()));
+      }
+
+      StateTag<BagState<KV<Integer, WindowedValue<?>>>> bufferTag =
+          StateTags.bag("bundle-buffer-tag",
+              new TaggedKvCoder(idsToCodersBuilder.build()));
+      bufferState = stateInternals.state(StateNamespaces.global(), bufferTag);
+    }
+
+    void openBuffer() {
+      this.openBuffer = true;
+    }
+
+    void closeBuffer() {
+      this.openBuffer = false;
+    }
+
     @Override
-    public DoFnRunners.OutputManager create(
-        final Output<StreamRecord<WindowedValue<OutputT>>> output) {
-      return new DoFnRunners.OutputManager() {
-        @Override
-        public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
-          // with tagged outputs we can't get around this because we don't
-          // know our own output type...
-          @SuppressWarnings("unchecked")
-          WindowedValue<OutputT> castValue = (WindowedValue<OutputT>) value;
-          output.collect(new StreamRecord<>(castValue));
-        }
-      };
+    public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
+      if (!openBuffer) {
+        emit(tag, value);
+      } else {
+        bufferState.add(KV.<Integer, WindowedValue<?>>of(tagsToIds.get(tag), value));
+      }
+    }
+
+    /**
+     * Flush elements of bufferState to Flink Output. This method can't be invoke in
+     * {@link #snapshotState(StateSnapshotContext)}
+     */
+    void flushBuffer() {
+      for (KV<Integer, WindowedValue<?>> taggedElem : bufferState.read()) {
+        emit(idsToTags.get(taggedElem.getKey()), (WindowedValue) taggedElem.getValue());
+      }
+      bufferState.clear();
+    }
+
+    private <T> void emit(TupleTag<T> tag, WindowedValue<T> value) {
+      if (tag.equals(mainTag)) {
+        // with tagged outputs we can't get around this because we don't
+        // know our own output type...
+        @SuppressWarnings("unchecked")
+        WindowedValue<OutputT> castValue = (WindowedValue<OutputT>) value;
+        output.collect(new StreamRecord<>(castValue));
+      } else {
+        @SuppressWarnings("unchecked")
+        OutputTag<WindowedValue<T>> outputTag = (OutputTag) tagsToOutputTags.get(tag);
+        output.collect(outputTag, new StreamRecord<>(value));
+      }
+    }
+  }
+
+  /**
+   * Coder for KV of id and value. It will be serialized in Flink checkpoint.
+   */
+  private static class TaggedKvCoder extends StructuredCoder<KV<Integer, WindowedValue<?>>> {
+
+    private Map<Integer, Coder<WindowedValue<?>>> idsToCoders;
+
+    TaggedKvCoder(Map<Integer, Coder<WindowedValue<?>>> idsToCoders) {
+      this.idsToCoders = idsToCoders;
+    }
+
+    @Override
+    public void encode(KV<Integer, WindowedValue<?>> kv, OutputStream out)
+        throws IOException {
+      Coder<WindowedValue<?>> coder = idsToCoders.get(kv.getKey());
+      VarIntCoder.of().encode(kv.getKey(), out);
+      coder.encode(kv.getValue(), out);
+    }
+
+    @Override
+    public KV<Integer, WindowedValue<?>> decode(InputStream in)
+        throws IOException {
+      Integer id = VarIntCoder.of().decode(in);
+      Coder<WindowedValue<?>> coder = idsToCoders.get(id);
+      WindowedValue<?> value = coder.decode(in);
+      return KV.<Integer, WindowedValue<?>>of(id, value);
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return new ArrayList<>(idsToCoders.values());
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+      for (Coder<?> coder : idsToCoders.values()) {
+        verifyDeterministic(this, "Coder must be deterministic", coder);
+      }
     }
   }
 
   /**
    * Implementation of {@link OutputManagerFactory} that creates an
-   * {@link DoFnRunners.OutputManager} that can write to multiple logical
-   * outputs by unioning them in a {@link RawUnionValue}.
+   * {@link BufferedOutputManager} that can write to multiple logical
+   * outputs by Flink side output.
    */
   public static class MultiOutputOutputManagerFactory<OutputT>
       implements OutputManagerFactory<OutputT> {
 
-    private TupleTag<?> mainTag;
-    Map<TupleTag<?>, OutputTag<WindowedValue<?>>> mapping;
+    private TupleTag<OutputT> mainTag;
+    private Map<TupleTag<?>, Integer> tagsToIds;
+    private Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags;
+    private Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders;
+
+    // There is no side output.
+    @SuppressWarnings("unchecked")
+    public MultiOutputOutputManagerFactory(
+        TupleTag<OutputT> mainTag, Coder<WindowedValue<OutputT>> mainCoder) {
+      this(mainTag,
+          new HashMap<TupleTag<?>, OutputTag<WindowedValue<?>>>(),
+          ImmutableMap.<TupleTag<?>, Coder<WindowedValue<?>>>builder()
+              .put(mainTag, (Coder) mainCoder).build(),
+          ImmutableMap.<TupleTag<?>, Integer>builder()
+              .put(mainTag, 0).build());
+    }
 
     public MultiOutputOutputManagerFactory(
-        TupleTag<?> mainTag,
-        Map<TupleTag<?>, OutputTag<WindowedValue<?>>> mapping) {
+        TupleTag<OutputT> mainTag,
+        Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags,
+        Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders,
+        Map<TupleTag<?>, Integer> tagsToIds) {
       this.mainTag = mainTag;
-      this.mapping = mapping;
+      this.tagsToOutputTags = tagsToOutputTags;
+      this.tagsToCoders = tagsToCoders;
+      this.tagsToIds = tagsToIds;
     }
 
     @Override
-    public DoFnRunners.OutputManager create(
-        final Output<StreamRecord<WindowedValue<OutputT>>> output) {
-      return new DoFnRunners.OutputManager() {
-        @Override
-        public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
-          if (tag.equals(mainTag)) {
-            @SuppressWarnings("unchecked")
-            WindowedValue<OutputT> outputValue = (WindowedValue<OutputT>) value;
-            output.collect(new StreamRecord<>(outputValue));
-          } else {
-            @SuppressWarnings("unchecked")
-            OutputTag<WindowedValue<T>> outputTag = (OutputTag) mapping.get(tag);
-            output.<WindowedValue<T>>collect(outputTag, new StreamRecord<>(value));
-          }
-        }
-      };
+    public BufferedOutputManager<OutputT> create(
+        Output<StreamRecord<WindowedValue<OutputT>>> output,
+        StateInternals stateInternals) {
+      return new BufferedOutputManager<>(
+          output, mainTag, tagsToOutputTags, tagsToCoders, tagsToIds, stateInternals);
     }
   }
 
@@ -742,11 +958,11 @@ public class DoFnOperator<InputT, OutputT>
    * {@link StepContext} for running {@link DoFn DoFns} on Flink. This does not allow
    * accessing state or timer internals.
    */
-  protected class StepContext implements org.apache.beam.runners.core.StepContext {
+  protected class FlinkStepContext implements StepContext {
 
     @Override
     public StateInternals stateInternals() {
-      return stateInternals;
+      return keyedStateInternals;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index be758a6..b255bb4 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -97,7 +97,7 @@ public class SplittableDoFnOperator<
       public StateInternals stateInternalsForKey(String key) {
         //this will implicitly be keyed by the key of the incoming
         // element or by the key of a firing timer
-        return (StateInternals) stateInternals;
+        return (StateInternals) keyedStateInternals;
       }
     };
     TimerInternalsFactory<String> timerInternalsFactory = new TimerInternalsFactory<String>() {
@@ -148,7 +148,7 @@ public class SplittableDoFnOperator<
   public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
     doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
         KeyedWorkItems.<String, KV<InputT, RestrictionT>>timersWorkItem(
-            (String) stateInternals.getKey(),
+            (String) keyedStateInternals.getKey(),
             Collections.singletonList(timer.getNamespace()))));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 78d585e..b1fb398 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -86,7 +86,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
       public StateInternals stateInternalsForKey(K key) {
         //this will implicitly be keyed by the key of the incoming
         // element or by the key of a firing timer
-        return (StateInternals) stateInternals;
+        return (StateInternals) keyedStateInternals;
       }
     };
     TimerInternalsFactory<K> timerInternalsFactory = new TimerInternalsFactory<K>() {
@@ -112,7 +112,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
   public void fireTimer(InternalTimer<?, TimerData> timer) {
     doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
         KeyedWorkItems.<K, InputT>timersWorkItem(
-            (K) stateInternals.getKey(),
+            (K) keyedStateInternals.getKey(),
             Collections.singletonList(timer.getNamespace()))));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
index bb2a9ff..09e59fd 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
@@ -167,7 +167,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals {
     @Override
     public void add(T input) {
       try {
-        flinkStateBackend.getOperatorState(descriptor).add(input);
+        flinkStateBackend.getListState(descriptor).add(input);
       } catch (Exception e) {
         throw new RuntimeException("Error updating state.", e);
       }
@@ -181,7 +181,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals {
     @Override
     public Iterable<T> read() {
       try {
-        Iterable<T> result = flinkStateBackend.getOperatorState(descriptor).get();
+        Iterable<T> result = flinkStateBackend.getListState(descriptor).get();
         return result != null ? result : Collections.<T>emptyList();
       } catch (Exception e) {
         throw new RuntimeException("Error updating state.", e);
@@ -194,7 +194,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals {
         @Override
         public Boolean read() {
           try {
-            Iterable<T> result = flinkStateBackend.getOperatorState(descriptor).get();
+            Iterable<T> result = flinkStateBackend.getListState(descriptor).get();
             // PartitionableListState.get() return empty collection When there is no element,
             // KeyedListState different. (return null)
             return result == null || Iterators.size(result.iterator()) == 0;
@@ -214,7 +214,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals {
     @Override
     public void clear() {
       try {
-        flinkStateBackend.getOperatorState(descriptor).clear();
+        flinkStateBackend.getListState(descriptor).clear();
       } catch (Exception e) {
         throw new RuntimeException("Error reading state.", e);
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index eb06026..57086df 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.flink;
 import java.util.Collections;
 import java.util.HashMap;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
@@ -60,13 +61,15 @@ public class PipelineOptionsTest {
 
   @Test(expected = Exception.class)
   public void parDoBaseClassPipelineOptionsNullTest() {
-    new DoFnOperator<>(
+    TupleTag<String> mainTag = new TupleTag<>("main-output");
+    Coder<WindowedValue<String>> coder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
+    DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
         new TestDoFn(),
         "stepName",
-        WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
-        new TupleTag<String>("main-output"),
+        coder,
+        mainTag,
         Collections.<TupleTag<?>>emptyList(),
-        new DoFnOperator.DefaultOutputManagerFactory<String>(),
+        new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, coder),
         WindowingStrategy.globalDefault(),
         new HashMap<Integer, PCollectionView<?>>(),
         Collections.<PCollectionView<?>>emptyList(),
@@ -81,13 +84,16 @@ public class PipelineOptionsTest {
   @Test
   public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception {
 
+    TupleTag<String> mainTag = new TupleTag<>("main-output");
+
+    Coder<WindowedValue<String>> coder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
     DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
         new TestDoFn(),
         "stepName",
-        WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
-        new TupleTag<String>("main-output"),
+        coder,
+        mainTag,
         Collections.<TupleTag<?>>emptyList(),
-        new DoFnOperator.DefaultOutputManagerFactory<String>(),
+        new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, coder),
         WindowingStrategy.globalDefault(),
         new HashMap<Integer, PCollectionView<?>>(),
         Collections.<PCollectionView<?>>emptyList(),
@@ -105,7 +111,6 @@ public class PipelineOptionsTest {
     OneInputStreamOperatorTestHarness<WindowedValue<Object>, WindowedValue<Object>> testHarness =
         new OneInputStreamOperatorTestHarness<>(deserialized,
             typeInformation.createSerializer(new ExecutionConfig()));
-
     testHarness.open();
 
     // execute once to access options

http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 4d2a912..ad17de8 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -52,6 +52,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -62,6 +63,7 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -107,18 +109,17 @@ public class DoFnOperatorTest {
   @SuppressWarnings("unchecked")
   public void testSingleOutput() throws Exception {
 
-    WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
-        WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
+    Coder<WindowedValue<String>> coder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
 
     TupleTag<String> outputTag = new TupleTag<>("main-output");
 
     DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
         new IdentityDoFn<String>(),
         "stepName",
-        windowedValueCoder,
+        coder,
         outputTag,
         Collections.<TupleTag<?>>emptyList(),
-        new DoFnOperator.DefaultOutputManagerFactory(),
+        new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
         WindowingStrategy.globalDefault(),
         new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
         Collections.<PCollectionView<?>>emptyList(), /* side inputs */
@@ -143,26 +144,38 @@ public class DoFnOperatorTest {
   @SuppressWarnings("unchecked")
   public void testMultiOutputOutput() throws Exception {
 
-    WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
+    WindowedValue.ValueOnlyWindowedValueCoder<String> coder =
         WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
 
     TupleTag<String> mainOutput = new TupleTag<>("main-output");
     TupleTag<String> additionalOutput1 = new TupleTag<>("output-1");
     TupleTag<String> additionalOutput2 = new TupleTag<>("output-2");
-    ImmutableMap<TupleTag<?>, OutputTag<?>> outputMapping =
+    ImmutableMap<TupleTag<?>, OutputTag<?>> tagsToOutputTags =
         ImmutableMap.<TupleTag<?>, OutputTag<?>>builder()
-            .put(mainOutput, new OutputTag<String>(mainOutput.getId()){})
             .put(additionalOutput1, new OutputTag<String>(additionalOutput1.getId()){})
             .put(additionalOutput2, new OutputTag<String>(additionalOutput2.getId()){})
             .build();
+    ImmutableMap<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders =
+        ImmutableMap.<TupleTag<?>, Coder<WindowedValue<?>>>builder()
+            .put(mainOutput, (Coder) coder)
+            .put(additionalOutput1, coder)
+            .put(additionalOutput2, coder)
+            .build();
+    ImmutableMap<TupleTag<?>, Integer> tagsToIds =
+        ImmutableMap.<TupleTag<?>, Integer>builder()
+            .put(mainOutput, 0)
+            .put(additionalOutput1, 1)
+            .put(additionalOutput2, 2)
+            .build();
 
     DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
         new MultiOutputDoFn(additionalOutput1, additionalOutput2),
         "stepName",
-        windowedValueCoder,
+        coder,
         mainOutput,
         ImmutableList.<TupleTag<?>>of(additionalOutput1, additionalOutput2),
-        new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, outputMapping),
+        new DoFnOperator.MultiOutputOutputManagerFactory(
+            mainOutput, tagsToOutputTags, tagsToCoders, tagsToIds),
         WindowingStrategy.globalDefault(),
         new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
         Collections.<PCollectionView<?>>emptyList(), /* side inputs */
@@ -184,13 +197,13 @@ public class DoFnOperatorTest {
             WindowedValue.valueInGlobalWindow("got: hello")));
 
     assertThat(
-        this.stripStreamRecord(testHarness.getSideOutput(outputMapping.get(additionalOutput1))),
+        this.stripStreamRecord(testHarness.getSideOutput(tagsToOutputTags.get(additionalOutput1))),
         contains(
             WindowedValue.valueInGlobalWindow("extra: one"),
             WindowedValue.valueInGlobalWindow("got: hello")));
 
     assertThat(
-        this.stripStreamRecord(testHarness.getSideOutput(outputMapping.get(additionalOutput2))),
+        this.stripStreamRecord(testHarness.getSideOutput(tagsToOutputTags.get(additionalOutput2))),
         contains(
             WindowedValue.valueInGlobalWindow("extra: two"),
             WindowedValue.valueInGlobalWindow("got: hello")));
@@ -255,7 +268,7 @@ public class DoFnOperatorTest {
         inputCoder,
         outputTag,
         Collections.<TupleTag<?>>emptyList(),
-        new DoFnOperator.DefaultOutputManagerFactory<String>(),
+        new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, outputCoder),
         windowingStrategy,
         new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
         Collections.<PCollectionView<?>>emptyList(), /* side inputs */
@@ -329,20 +342,20 @@ public class DoFnOperatorTest {
       }
     };
 
-    WindowedValue.FullWindowedValueCoder<Integer> windowedValueCoder =
-        WindowedValue.getFullCoder(
-            VarIntCoder.of(),
-            windowingStrategy.getWindowFn().windowCoder());
+    Coder<WindowedValue<Integer>> inputCoder = WindowedValue.getFullCoder(
+        VarIntCoder.of(), windowingStrategy.getWindowFn().windowCoder());
+    Coder<WindowedValue<String>> outputCoder = WindowedValue.getFullCoder(
+        StringUtf8Coder.of(), windowingStrategy.getWindowFn().windowCoder());
 
     TupleTag<String> outputTag = new TupleTag<>("main-output");
 
     DoFnOperator<Integer, String> doFnOperator = new DoFnOperator<>(
         fn,
         "stepName",
-        windowedValueCoder,
+        inputCoder,
         outputTag,
         Collections.<TupleTag<?>>emptyList(),
-        new DoFnOperator.DefaultOutputManagerFactory<String>(),
+        new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, outputCoder),
         windowingStrategy,
         new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
         Collections.<PCollectionView<?>>emptyList(), /* side inputs */
@@ -441,7 +454,7 @@ public class DoFnOperatorTest {
           }
         };
 
-    WindowedValue.FullWindowedValueCoder<KV<String, Integer>> windowedValueCoder =
+    WindowedValue.FullWindowedValueCoder<KV<String, Integer>> coder =
         WindowedValue.getFullCoder(
             KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
             windowingStrategy.getWindowFn().windowCoder());
@@ -452,10 +465,10 @@ public class DoFnOperatorTest {
         new DoFnOperator<>(
             fn,
             "stepName",
-            windowedValueCoder,
+            coder,
             outputTag,
             Collections.<TupleTag<?>>emptyList(),
-            new DoFnOperator.DefaultOutputManagerFactory<KV<String, Integer>>(),
+            new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
             windowingStrategy,
             new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
             Collections.<PCollectionView<?>>emptyList(), /* side inputs */
@@ -531,8 +544,7 @@ public class DoFnOperatorTest {
 
   public void testSideInputs(boolean keyed) throws Exception {
 
-    WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
-        WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
+    Coder<WindowedValue<String>> coder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
 
     TupleTag<String> outputTag = new TupleTag<>("main-output");
 
@@ -550,10 +562,10 @@ public class DoFnOperatorTest {
     DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
         new IdentityDoFn<String>(),
         "stepName",
-        windowedValueCoder,
+        coder,
         outputTag,
         Collections.<TupleTag<?>>emptyList(),
-        new DoFnOperator.DefaultOutputManagerFactory<String>(),
+        new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
         WindowingStrategy.globalDefault(),
         sideInputMapping, /* side-input mapping */
         ImmutableList.<PCollectionView<?>>of(view1, view2), /* side inputs */
@@ -631,6 +643,105 @@ public class DoFnOperatorTest {
     testSideInputs(true);
   }
 
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testBundle() throws Exception {
+
+    WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
+        WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
+
+    TupleTag<String> outputTag = new TupleTag<>("main-output");
+    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setMaxBundleSize(2L);
+    options.setMaxBundleTimeMills(10L);
+
+    IdentityDoFn<String> doFn = new IdentityDoFn<String>() {
+      @FinishBundle
+      public void finishBundle(FinishBundleContext context) {
+        context.output(
+            "finishBundle", BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE);
+      }
+    };
+
+    DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
+        new DoFnOperator.MultiOutputOutputManagerFactory(
+            outputTag,
+            WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE));
+
+    DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
+        doFn,
+        "stepName",
+        windowedValueCoder,
+        outputTag,
+        Collections.<TupleTag<?>>emptyList(),
+        outputManagerFactory,
+        WindowingStrategy.globalDefault(),
+        new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+        Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+        options,
+        null);
+
+    OneInputStreamOperatorTestHarness<WindowedValue<String>, WindowedValue<String>> testHarness =
+        new OneInputStreamOperatorTestHarness<>(doFnOperator);
+
+    testHarness.open();
+
+    testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("a")));
+    testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("b")));
+    testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("c")));
+
+    // draw a snapshot
+    OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+    // There is a finishBundle in snapshot()
+    // Elements will be buffered as part of finishing a bundle in snapshot()
+    assertThat(
+        this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        contains(
+            WindowedValue.valueInGlobalWindow("a"),
+            WindowedValue.valueInGlobalWindow("b"),
+            WindowedValue.valueInGlobalWindow("finishBundle"),
+            WindowedValue.valueInGlobalWindow("c")));
+
+    testHarness.close();
+
+    DoFnOperator<String, String> newDoFnOperator = new DoFnOperator<>(
+        doFn,
+        "stepName",
+        windowedValueCoder,
+        outputTag,
+        Collections.<TupleTag<?>>emptyList(),
+        outputManagerFactory,
+        WindowingStrategy.globalDefault(),
+        new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+        Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+        options,
+        null);
+
+    OneInputStreamOperatorTestHarness<WindowedValue<String>, WindowedValue<String>> newHarness =
+        new OneInputStreamOperatorTestHarness<>(newDoFnOperator);
+
+    // restore snapshot
+    newHarness.initializeState(snapshot);
+
+    newHarness.open();
+
+    // startBundle will output the buffered elements.
+    newHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("d")));
+
+    // check finishBundle by timeout
+    newHarness.setProcessingTime(10);
+
+    assertThat(
+        this.<String>stripStreamRecordFromWindowedValue(newHarness.getOutput()),
+        contains(
+            WindowedValue.valueInGlobalWindow("finishBundle"),
+            WindowedValue.valueInGlobalWindow("d"),
+            WindowedValue.valueInGlobalWindow("finishBundle")));
+
+    newHarness.close();
+  }
+
   private <T> Iterable<WindowedValue<T>> stripStreamRecordFromWindowedValue(
       Iterable<Object> input) {
 


[2/2] beam git commit: This closes #3368

Posted by pe...@apache.org.
This closes #3368


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/724eda37
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/724eda37
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/724eda37

Branch: refs/heads/master
Commit: 724eda37ea1e54aac089d89c711ca3cee14a4603
Parents: 3a8b0b6 ceec7ce
Author: Pei He <pe...@apache.org>
Authored: Wed Aug 16 11:46:49 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Wed Aug 16 11:46:49 2017 +0800

----------------------------------------------------------------------
 .../runners/flink/FlinkPipelineOptions.java     |  11 +
 .../FlinkStreamingTransformTranslators.java     |  77 ++--
 .../wrappers/streaming/DoFnOperator.java        | 412 ++++++++++++++-----
 .../streaming/SplittableDoFnOperator.java       |   4 +-
 .../wrappers/streaming/WindowDoFnOperator.java  |   4 +-
 .../state/FlinkSplitStateInternals.java         |   8 +-
 .../beam/runners/flink/PipelineOptionsTest.java |  21 +-
 .../flink/streaming/DoFnOperatorTest.java       | 161 ++++++--
 8 files changed, 535 insertions(+), 163 deletions(-)
----------------------------------------------------------------------