You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 17:09:49 UTC

[22/28] beam git commit: Revert "[BEAM-2610] This closes #3553"

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
index 365b6c4..c0919b9 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
@@ -64,9 +64,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable {
                         c.element()[0] = 'b';
                       }
                     }));
-    PCollection<Long> consumer = pcollection.apply(Count.<byte[]>globally());
-    DirectGraphs.performDirectOverrides(p);
-    this.consumer = DirectGraphs.getProducer(consumer);
+    consumer = DirectGraphs.getProducer(pcollection.apply(Count.<byte[]>globally()));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 7912538..09a21ac 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -98,7 +98,7 @@ public class ParDoEvaluatorTest {
     when(evaluationContext.createBundle(output)).thenReturn(outputBundle);
 
     ParDoEvaluator<Integer> evaluator =
-        createEvaluator(singletonView, fn, inputPc, output);
+        createEvaluator(singletonView, fn, output);
 
     IntervalWindow nonGlobalWindow = new IntervalWindow(new Instant(0), new Instant(10_000L));
     WindowedValue<Integer> first = WindowedValue.valueInGlobalWindow(3);
@@ -132,7 +132,6 @@ public class ParDoEvaluatorTest {
   private ParDoEvaluator<Integer> createEvaluator(
       PCollectionView<Integer> singletonView,
       RecorderFn fn,
-      PCollection<Integer> input,
       PCollection<Integer> output) {
     when(
             evaluationContext.createSideInputReader(
@@ -150,7 +149,6 @@ public class ParDoEvaluatorTest {
                 Mockito.any(AppliedPTransform.class), Mockito.any(StructuralKey.class)))
         .thenReturn(executionContext);
 
-    DirectGraphs.performDirectOverrides(p);
     @SuppressWarnings("unchecked")
     AppliedPTransform<PCollection<Integer>, ?, ?> transform =
         (AppliedPTransform<PCollection<Integer>, ?, ?>) DirectGraphs.getProducer(output);
@@ -158,7 +156,8 @@ public class ParDoEvaluatorTest {
         evaluationContext,
         stepContext,
         transform,
-        input.getWindowingStrategy(),
+        ((PCollection<?>) Iterables.getOnlyElement(transform.getInputs().values()))
+            .getWindowingStrategy(),
         fn,
         null /* key */,
         ImmutableList.<PCollectionView<?>>of(singletonView),

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
index fe0b743..9366b7c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
@@ -41,7 +41,6 @@ import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -53,6 +52,7 @@ import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -128,17 +128,16 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
         input
             .apply(
                 new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>(
-                    new DoFn<KV<String, Integer>, Integer>() {
-                      @StateId(stateId)
-                      private final StateSpec<ValueState<String>> spec =
-                          StateSpecs.value(StringUtf8Coder.of());
-
-                      @ProcessElement
-                      public void process(ProcessContext c) {}
-                    },
-                    mainOutput,
-                    TupleTagList.empty(),
-                    Collections.<PCollectionView<?>>emptyList()))
+                    ParDo.of(
+                            new DoFn<KV<String, Integer>, Integer>() {
+                              @StateId(stateId)
+                              private final StateSpec<ValueState<String>> spec =
+                                  StateSpecs.value(StringUtf8Coder.of());
+
+                              @ProcessElement
+                              public void process(ProcessContext c) {}
+                            })
+                        .withOutputTags(mainOutput, TupleTagList.empty())))
             .get(mainOutput)
             .setCoder(VarIntCoder.of());
 
@@ -154,7 +153,8 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
     when(mockEvaluationContext.getExecutionContext(
             eq(producingTransform), Mockito.<StructuralKey>any()))
         .thenReturn(mockExecutionContext);
-    when(mockExecutionContext.getStepContext(anyString())).thenReturn(mockStepContext);
+    when(mockExecutionContext.getStepContext(anyString()))
+        .thenReturn(mockStepContext);
 
     IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(9));
     IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(19));
@@ -241,17 +241,18 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
         mainInput
             .apply(
                 new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>(
-                    new DoFn<KV<String, Integer>, Integer>() {
-                      @StateId(stateId)
-                      private final StateSpec<ValueState<String>> spec =
-                          StateSpecs.value(StringUtf8Coder.of());
-
-                      @ProcessElement
-                      public void process(ProcessContext c) {}
-                    },
-                    mainOutput,
-                    TupleTagList.empty(),
-                    Collections.<PCollectionView<?>>singletonList(sideInput)))
+                    ParDo
+                        .of(
+                            new DoFn<KV<String, Integer>, Integer>() {
+                              @StateId(stateId)
+                              private final StateSpec<ValueState<String>> spec =
+                                  StateSpecs.value(StringUtf8Coder.of());
+
+                              @ProcessElement
+                              public void process(ProcessContext c) {}
+                            })
+                        .withSideInputs(sideInput)
+                        .withOutputTags(mainOutput, TupleTagList.empty())))
             .get(mainOutput)
             .setCoder(VarIntCoder.of());
 
@@ -268,7 +269,8 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
     when(mockEvaluationContext.getExecutionContext(
             eq(producingTransform), Mockito.<StructuralKey>any()))
         .thenReturn(mockExecutionContext);
-    when(mockExecutionContext.getStepContext(anyString())).thenReturn(mockStepContext);
+    when(mockExecutionContext.getStepContext(anyString()))
+        .thenReturn(mockStepContext);
     when(mockEvaluationContext.createBundle(Matchers.<PCollection<Integer>>any()))
         .thenReturn(mockUncommittedBundle);
     when(mockStepContext.getTimerUpdate()).thenReturn(TimerUpdate.empty());
@@ -285,8 +287,11 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
     // global window state merely by having the evaluator created. The cleanup logic does not
     // depend on the window.
     String key = "hello";
-    WindowedValue<KV<String, Integer>> firstKv =
-        WindowedValue.of(KV.of(key, 1), new Instant(3), firstWindow, PaneInfo.NO_FIRING);
+    WindowedValue<KV<String, Integer>> firstKv = WindowedValue.of(
+        KV.of(key, 1),
+        new Instant(3),
+        firstWindow,
+        PaneInfo.NO_FIRING);
 
     WindowedValue<KeyedWorkItem<String, KV<String, Integer>>> gbkOutputElement =
         firstKv.withValue(
@@ -301,8 +306,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
         BUNDLE_FACTORY
             .createBundle(
                 (PCollection<KeyedWorkItem<String, KV<String, Integer>>>)
-                    Iterables.getOnlyElement(
-                        TransformInputs.nonAdditionalInputs(producingTransform)))
+                    Iterables.getOnlyElement(producingTransform.getInputs().values()))
             .add(gbkOutputElement)
             .commit(Instant.now());
     TransformEvaluator<KeyedWorkItem<String, KV<String, Integer>>> evaluator =
@@ -312,7 +316,8 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
 
     // This should push back every element as a KV<String, Iterable<Integer>>
     // in the appropriate window. Since the keys are equal they are single-threaded
-    TransformResult<KeyedWorkItem<String, KV<String, Integer>>> result = evaluator.finishBundle();
+    TransformResult<KeyedWorkItem<String, KV<String, Integer>>> result =
+        evaluator.finishBundle();
 
     List<Integer> pushedBackInts = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index b7f5a7c..86412a0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -25,8 +25,6 @@ import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;
 
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -92,7 +90,6 @@ public class TransformExecutorTest {
     created = p.apply(Create.of("foo", "spam", "third"));
     PCollection<KV<Integer, String>> downstream = created.apply(WithKeys.<Integer, String>of(3));
 
-    DirectGraphs.performDirectOverrides(p);
     DirectGraph graph = DirectGraphs.getGraph(p);
     createdProducer = graph.getProducer(created);
     downstreamProducer = graph.getProducer(downstream);
@@ -417,13 +414,8 @@ public class TransformExecutorTest {
               ? Collections.emptyList()
               : result.getUnprocessedElements();
 
-      Optional<? extends CommittedBundle<?>> unprocessedBundle;
-      if (inputBundle == null || Iterables.isEmpty(unprocessedElements)) {
-        unprocessedBundle = Optional.absent();
-      } else {
-        unprocessedBundle =
-            Optional.<CommittedBundle<?>>of(inputBundle.withElements(unprocessedElements));
-      }
+      CommittedBundle<?> unprocessedBundle =
+          inputBundle == null ? null : inputBundle.withElements(unprocessedElements);
       return CommittedResult.create(
           result,
           unprocessedBundle,

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index 5bc48b7..419698e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PCollectionViews;
 import org.joda.time.Instant;
 import org.junit.Rule;
@@ -65,13 +66,12 @@ public class ViewEvaluatorFactoryTest {
             .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of()))
             .apply(GroupByKey.<Void, String>create())
             .apply(Values.<Iterable<String>>create());
-    PCollection<Iterable<String>> view =
-        concat.apply(
-            new ViewOverrideFactory.WriteView<String, Iterable<String>>(createView.getView()));
+    PCollectionView<Iterable<String>> view =
+        concat.apply(new ViewOverrideFactory.WriteView<>(createView));
 
     EvaluationContext context = mock(EvaluationContext.class);
     TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>();
-    when(context.createPCollectionViewWriter(concat, createView.getView())).thenReturn(viewWriter);
+    when(context.createPCollectionViewWriter(concat, view)).thenReturn(viewWriter);
 
     CommittedBundle<String> inputBundle = bundleFactory.createBundle(input).commit(Instant.now());
     AppliedPTransform<?, ?, ?> producer = DirectGraphs.getProducer(view);

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
index 6af9273..024e15c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
@@ -37,11 +37,8 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.transforms.ViewFn;
-import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PCollectionViews;
@@ -66,19 +63,24 @@ public class ViewOverrideFactoryTest implements Serializable {
     PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3));
     final PCollectionView<List<Integer>> view =
         PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder());
-    PTransformReplacement<PCollection<Integer>, PCollection<Integer>>
+    PTransformReplacement<PCollection<Integer>, PCollectionView<List<Integer>>>
         replacementTransform =
             factory.getReplacementTransform(
                 AppliedPTransform
-                    .<PCollection<Integer>, PCollection<Integer>,
-                        PTransform<PCollection<Integer>, PCollection<Integer>>>
+                    .<PCollection<Integer>, PCollectionView<List<Integer>>,
+                        CreatePCollectionView<Integer, List<Integer>>>
                         of(
                             "foo",
                             ints.expand(),
                             view.expand(),
                             CreatePCollectionView.<Integer, List<Integer>>of(view),
                             p));
-    ints.apply(replacementTransform.getTransform());
+    PCollectionView<List<Integer>> afterReplacement =
+        ints.apply(replacementTransform.getTransform());
+    assertThat(
+        "The CreatePCollectionView replacement should return the same View",
+        afterReplacement,
+        equalTo(view));
 
     PCollection<Set<Integer>> outputViewContents =
         p.apply("CreateSingleton", Create.of(0))
@@ -102,11 +104,11 @@ public class ViewOverrideFactoryTest implements Serializable {
     final PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3));
     final PCollectionView<List<Integer>> view =
         PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder());
-    PTransformReplacement<PCollection<Integer>, PCollection<Integer>> replacement =
+    PTransformReplacement<PCollection<Integer>, PCollectionView<List<Integer>>> replacement =
         factory.getReplacementTransform(
             AppliedPTransform
-                .<PCollection<Integer>, PCollection<Integer>,
-                    PTransform<PCollection<Integer>, PCollection<Integer>>>
+                .<PCollection<Integer>, PCollectionView<List<Integer>>,
+                    CreatePCollectionView<Integer, List<Integer>>>
                     of(
                         "foo",
                         ints.expand(),
@@ -124,19 +126,8 @@ public class ViewOverrideFactoryTest implements Serializable {
                   "There should only be one WriteView primitive in the graph",
                   writeViewVisited.getAndSet(true),
                   is(false));
-              PCollectionView<?> replacementView = ((WriteView) node.getTransform()).getView();
-
-              // replacementView.getPCollection() is null, but that is not a requirement
-              // so not asserted one way or the other
-              assertThat(
-                  replacementView.getTagInternal(),
-                  equalTo(view.getTagInternal()));
-              assertThat(
-                  replacementView.getViewFn(),
-                  Matchers.<ViewFn<?, ?>>equalTo(view.getViewFn()));
-              assertThat(
-                  replacementView.getWindowMappingFn(),
-                  Matchers.<WindowMappingFn<?>>equalTo(view.getWindowMappingFn()));
+              PCollectionView replacementView = ((WriteView) node.getTransform()).getView();
+              assertThat(replacementView, Matchers.<PCollectionView>theInstance(view));
               assertThat(node.getInputs().entrySet(), hasSize(1));
             }
           }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
index 1d8aac1..b667346 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
@@ -59,7 +59,6 @@ public class WatermarkCallbackExecutorTest {
   public void setup() {
     PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
     PCollection<Integer> summed = created.apply(Sum.integersGlobally());
-    DirectGraphs.performDirectOverrides(p);
     DirectGraph graph = DirectGraphs.getGraph(p);
     create = graph.getProducer(created);
     sum = graph.getProducer(summed);

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index e3f6215..9528ac9 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -24,7 +24,6 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
@@ -122,7 +121,6 @@ public class WatermarkManagerTest implements Serializable {
     flattened = preFlatten.apply("flattened", Flatten.<Integer>pCollections());
 
     clock = MockClock.fromInstant(new Instant(1000));
-    DirectGraphs.performDirectOverrides(p);
     graph = DirectGraphs.getGraph(p);
 
     manager = WatermarkManager.create(clock, graph);
@@ -319,7 +317,7 @@ public class WatermarkManagerTest implements Serializable {
         TimerUpdate.empty(),
         CommittedResult.create(
             StepTransformResult.withoutHold(graph.getProducer(created)).build(),
-            Optional.<CommittedBundle<?>>absent(),
+            root.withElements(Collections.<WindowedValue<Void>>emptyList()),
             Collections.singleton(createBundle),
             EnumSet.allOf(OutputType.class)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -333,7 +331,7 @@ public class WatermarkManagerTest implements Serializable {
         TimerUpdate.empty(),
         CommittedResult.create(
             StepTransformResult.withoutHold(theFlatten).build(),
-            Optional.<CommittedBundle<?>>absent(),
+            createBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList(),
             EnumSet.allOf(OutputType.class)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -346,7 +344,7 @@ public class WatermarkManagerTest implements Serializable {
         TimerUpdate.empty(),
         CommittedResult.create(
             StepTransformResult.withoutHold(theFlatten).build(),
-            Optional.<CommittedBundle<?>>absent(),
+            createBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList(),
             EnumSet.allOf(OutputType.class)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1502,15 +1500,9 @@ public class WatermarkManagerTest implements Serializable {
       AppliedPTransform<?, ?, ?> transform,
       @Nullable CommittedBundle<?> unprocessedBundle,
       Iterable<? extends CommittedBundle<?>> bundles) {
-    Optional<? extends CommittedBundle<?>> unprocessedElements;
-    if (unprocessedBundle == null || Iterables.isEmpty(unprocessedBundle.getElements())) {
-      unprocessedElements = Optional.absent();
-    } else {
-      unprocessedElements = Optional.of(unprocessedBundle);
-    }
     return CommittedResult.create(
         StepTransformResult.withoutHold(transform).build(),
-        unprocessedElements,
+        unprocessedBundle,
         bundles,
         Iterables.isEmpty(bundles)
             ? EnumSet.noneOf(OutputType.class)

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 546a181..a88d95e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -30,7 +30,6 @@ import static org.junit.Assert.assertThat;
 import java.io.File;
 import java.io.FileReader;
 import java.io.Reader;
-import java.io.Serializable;
 import java.nio.CharBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -39,8 +38,9 @@ import java.util.UUID;
 import org.apache.beam.runners.direct.WriteWithShardingFactory.CalculateShardsFn;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.DynamicFileDestinations;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.LocalResources;
 import org.apache.beam.sdk.io.TextIO;
@@ -53,8 +53,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -73,17 +71,11 @@ import org.junit.runners.JUnit4;
  * Tests for {@link WriteWithShardingFactory}.
  */
 @RunWith(JUnit4.class)
-public class WriteWithShardingFactoryTest implements Serializable {
-
+public class WriteWithShardingFactoryTest {
   private static final int INPUT_SIZE = 10000;
-
-  @Rule public transient TemporaryFolder tmp = new TemporaryFolder();
-
-  private transient WriteWithShardingFactory<Object> factory = new WriteWithShardingFactory<>();
-
-  @Rule
-  public final transient TestPipeline p =
-      TestPipeline.create().enableAbandonedNodeEnforcement(false);
+  @Rule public TemporaryFolder tmp = new TemporaryFolder();
+  private WriteWithShardingFactory<Object> factory = new WriteWithShardingFactory<>();
+  @Rule public final TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
 
   @Test
   public void dynamicallyReshardedWrite() throws Exception {
@@ -137,24 +129,26 @@ public class WriteWithShardingFactoryTest implements Serializable {
   @Test
   public void withNoShardingSpecifiedReturnsNewTransform() {
     ResourceId outputDirectory = LocalResources.fromString("/foo", true /* isDirectory */);
-
-    PTransform<PCollection<Object>, PDone> original =
+    FilenamePolicy policy =
+        DefaultFilenamePolicy.constructUsingStandardParameters(
+            StaticValueProvider.of(outputDirectory),
+            DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE,
+            "",
+            false);
+    WriteFiles<Object> original =
         WriteFiles.to(
-            new FileBasedSink<Object, Void>(
-                StaticValueProvider.of(outputDirectory), DynamicFileDestinations.constant(null)) {
+            new FileBasedSink<Object>(StaticValueProvider.of(outputDirectory), policy) {
               @Override
-              public WriteOperation<Object, Void> createWriteOperation() {
+              public WriteOperation<Object> createWriteOperation() {
                 throw new IllegalArgumentException("Should not be used");
               }
-            },
-            SerializableFunctions.identity());
+            });
     @SuppressWarnings("unchecked")
     PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
 
-    AppliedPTransform<PCollection<Object>, PDone, PTransform<PCollection<Object>, PDone>>
-        originalApplication =
-            AppliedPTransform.of(
-                "write", objs.expand(), Collections.<TupleTag<?>, PValue>emptyMap(), original, p);
+    AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object>> originalApplication =
+        AppliedPTransform.of(
+            "write", objs.expand(), Collections.<TupleTag<?>, PValue>emptyMap(), original, p);
 
     assertThat(
         factory.getReplacementTransform(originalApplication).getTransform(),

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index c063a2d..c4c6b55 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
@@ -91,6 +91,7 @@
                   <excludedGroups>
                     org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
                     org.apache.beam.sdk.testing.LargeKeys$Above100MB,
+                    org.apache.beam.sdk.testing.UsesSetState,
                     org.apache.beam.sdk.testing.UsesCommittedMetrics,
                     org.apache.beam.sdk.testing.UsesTestStream,
                     org.apache.beam.sdk.testing.UsesSplittableParDo
@@ -380,13 +381,5 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
-
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-runners-core-java</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
deleted file mode 100644
index 0cc3aec..0000000
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.common.collect.Iterables;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.construction.ReplacementOutputs;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.PTransformOverrideFactory;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/** Flink streaming overrides for various view (side input) transforms. */
-class CreateStreamingFlinkView<ElemT, ViewT>
-    extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
-  private final PCollectionView<ViewT> view;
-
-  public CreateStreamingFlinkView(PCollectionView<ViewT> view) {
-    this.view = view;
-  }
-
-  @Override
-  public PCollection<ElemT> expand(PCollection<ElemT> input) {
-    input
-        .apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults())
-        .apply(CreateFlinkPCollectionView.<ElemT, ViewT>of(view));
-    return input;
-  }
-
-  /**
-   * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
-   *
-   * <p>For internal use by {@link CreateStreamingFlinkView}. This combiner requires that the input
-   * {@link PCollection} fits in memory. For a large {@link PCollection} this is expected to crash!
-   *
-   * @param <T> the type of elements to concatenate.
-   */
-  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
-    @Override
-    public List<T> createAccumulator() {
-      return new ArrayList<T>();
-    }
-
-    @Override
-    public List<T> addInput(List<T> accumulator, T input) {
-      accumulator.add(input);
-      return accumulator;
-    }
-
-    @Override
-    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
-      List<T> result = createAccumulator();
-      for (List<T> accumulator : accumulators) {
-        result.addAll(accumulator);
-      }
-      return result;
-    }
-
-    @Override
-    public List<T> extractOutput(List<T> accumulator) {
-      return accumulator;
-    }
-
-    @Override
-    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
-      return ListCoder.of(inputCoder);
-    }
-
-    @Override
-    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
-      return ListCoder.of(inputCoder);
-    }
-  }
-
-  /**
-   * Creates a primitive {@link PCollectionView}.
-   *
-   * <p>For internal use only by runner implementors.
-   *
-   * @param <ElemT> The type of the elements of the input PCollection
-   * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
-   */
-  public static class CreateFlinkPCollectionView<ElemT, ViewT>
-      extends PTransform<PCollection<List<ElemT>>, PCollection<List<ElemT>>> {
-    private PCollectionView<ViewT> view;
-
-    private CreateFlinkPCollectionView(PCollectionView<ViewT> view) {
-      this.view = view;
-    }
-
-    public static <ElemT, ViewT> CreateFlinkPCollectionView<ElemT, ViewT> of(
-        PCollectionView<ViewT> view) {
-      return new CreateFlinkPCollectionView<>(view);
-    }
-
-    @Override
-    public PCollection<List<ElemT>> expand(PCollection<List<ElemT>> input) {
-      return PCollection.<List<ElemT>>createPrimitiveOutputInternal(
-              input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
-          .setCoder(input.getCoder());
-    }
-
-    public PCollectionView<ViewT> getView() {
-      return view;
-    }
-  }
-
-  public static class Factory<ElemT, ViewT>
-      implements PTransformOverrideFactory<
-          PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>> {
-    public Factory() {}
-
-    @Override
-    public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>> getReplacementTransform(
-        AppliedPTransform<
-                PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>>
-            transform) {
-      return PTransformReplacement.of(
-          (PCollection<ElemT>) Iterables.getOnlyElement(transform.getInputs().values()),
-          new CreateStreamingFlinkView<ElemT, ViewT>(transform.getTransform().getView()));
-    }
-
-    @Override
-    public Map<PValue, ReplacementOutput> mapOutputs(
-        Map<TupleTag<?>, PValue> outputs, PCollection<ElemT> newOutput) {
-      return ReplacementOutputs.singleton(outputs, newOutput);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
index 6e70198..0439119 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.flink;
 import com.google.common.collect.Iterables;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -144,7 +143,7 @@ class FlinkBatchTranslationContext {
 
   @SuppressWarnings("unchecked")
   <T extends PValue> T getInput(PTransform<T, ?> transform) {
-    return (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform));
+    return (T) Iterables.getOnlyElement(currentTransform.getInputs().values());
   }
 
   Map<TupleTag<?>, PValue> getOutputs(PTransform<?, ?> transform) {

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index d2a2016..fe5dd87 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -84,8 +84,6 @@ class FlinkPipelineExecutionEnvironment {
     this.flinkBatchEnv = null;
     this.flinkStreamEnv = null;
 
-    pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides(options.isStreaming()));
-
     PipelineTranslationOptimizer optimizer =
         new PipelineTranslationOptimizer(TranslationMode.BATCH, options);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index f733e2e..8da68c5 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -17,18 +17,27 @@
  */
 package org.apache.beam.runners.flink;
 
+import com.google.common.collect.ImmutableList;
+import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
+import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.runners.core.construction.SplittableParDo;
 import org.apache.beam.runners.core.construction.UnconsumedReads;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PValue;
@@ -64,8 +73,54 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
 
   @Override
   public void translate(Pipeline pipeline) {
+    List<PTransformOverride> transformOverrides =
+        ImmutableList.<PTransformOverride>builder()
+            .add(
+                PTransformOverride.of(
+                    PTransformMatchers.splittableParDoMulti(),
+                    new SplittableParDoOverrideFactory()))
+            .add(
+                PTransformOverride.of(
+                    PTransformMatchers.classEqualTo(SplittableParDo.ProcessKeyedElements.class),
+                    new SplittableParDoViaKeyedWorkItems.OverrideFactory()))
+            .add(
+                PTransformOverride.of(
+                    PTransformMatchers.classEqualTo(View.AsIterable.class),
+                    new ReflectiveOneToOneOverrideFactory(
+                        FlinkStreamingViewOverrides.StreamingViewAsIterable.class, flinkRunner)))
+            .add(
+                PTransformOverride.of(
+                    PTransformMatchers.classEqualTo(View.AsList.class),
+                    new ReflectiveOneToOneOverrideFactory(
+                        FlinkStreamingViewOverrides.StreamingViewAsList.class, flinkRunner)))
+            .add(
+                PTransformOverride.of(
+                    PTransformMatchers.classEqualTo(View.AsMap.class),
+                    new ReflectiveOneToOneOverrideFactory(
+                        FlinkStreamingViewOverrides.StreamingViewAsMap.class, flinkRunner)))
+            .add(
+                PTransformOverride.of(
+                    PTransformMatchers.classEqualTo(View.AsMultimap.class),
+                    new ReflectiveOneToOneOverrideFactory(
+                        FlinkStreamingViewOverrides.StreamingViewAsMultimap.class, flinkRunner)))
+            .add(
+                PTransformOverride.of(
+                    PTransformMatchers.classEqualTo(View.AsSingleton.class),
+                    new ReflectiveOneToOneOverrideFactory(
+                        FlinkStreamingViewOverrides.StreamingViewAsSingleton.class, flinkRunner)))
+            // this has to be last since the ViewAsSingleton override
+            // can expand to a Combine.GloballyAsSingletonView
+            .add(
+                PTransformOverride.of(
+                    PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+                    new ReflectiveOneToOneOverrideFactory(
+                        FlinkStreamingViewOverrides.StreamingCombineGloballyAsSingletonView.class,
+                        flinkRunner)))
+            .build();
+
     // Ensure all outputs of all reads are consumed.
     UnconsumedReads.ensureAllReadsConsumed(pipeline);
+    pipeline.replaceAll(transformOverrides);
     super.translate(pipeline);
   }
 
@@ -173,6 +228,35 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
     }
   }
 
+  private static class ReflectiveOneToOneOverrideFactory<
+          InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>>
+      extends SingleInputOutputOverrideFactory<
+          PCollection<InputT>, PCollection<OutputT>, TransformT> {
+    private final Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement;
+    private final FlinkRunner runner;
+
+    private ReflectiveOneToOneOverrideFactory(
+        Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement,
+        FlinkRunner runner) {
+      this.replacement = replacement;
+      this.runner = runner;
+    }
+
+    @Override
+    public PTransformReplacement<PCollection<InputT>, PCollection<OutputT>> getReplacementTransform(
+        AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, TransformT> transform) {
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(transform),
+          InstanceBuilder.ofType(replacement)
+              .withArg(FlinkRunner.class, runner)
+              .withArg(
+                  (Class<PTransform<PCollection<InputT>, PCollection<OutputT>>>)
+                      transform.getTransform().getClass(),
+                  transform.getTransform())
+              .build());
+    }
+  }
+
   /**
    * A {@link PTransformOverrideFactory} that overrides a <a
    * href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a> with {@link SplittableParDo}.
@@ -188,7 +272,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
                 transform) {
       return PTransformReplacement.of(
           PTransformReplacements.getSingletonMainInput(transform),
-          (SplittableParDo<InputT, OutputT, ?>) SplittableParDo.forAppliedParDo(transform));
+          new SplittableParDo<>(transform.getTransform()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 3d7e81f..2a7c5d6 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
 import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.construction.ElementAndRestriction;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
@@ -123,7 +124,7 @@ class FlinkStreamingTransformTranslators {
     TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator());
     TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslator());
     TRANSLATORS.put(
-        CreateStreamingFlinkView.CreateFlinkPCollectionView.class,
+        FlinkStreamingViewOverrides.CreateFlinkPCollectionView.class,
         new CreateViewStreamingTranslator());
 
     TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorStreaming());
@@ -362,13 +363,8 @@ class FlinkStreamingTransformTranslators {
       Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags = Maps.newHashMap();
       for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
         if (!tagsToOutputTags.containsKey(entry.getKey())) {
-          tagsToOutputTags.put(
-              entry.getKey(),
-              new OutputTag<WindowedValue<?>>(
-                  entry.getKey().getId(),
-                  (TypeInformation) context.getTypeInfo((PCollection<?>) entry.getValue())
-              )
-          );
+          tagsToOutputTags.put(entry.getKey(), new OutputTag<>(entry.getKey().getId(),
+              (TypeInformation) context.getTypeInfo((PCollection<?>) entry.getValue())));
         }
       }
 
@@ -547,11 +543,14 @@ class FlinkStreamingTransformTranslators {
           transform.getAdditionalOutputTags().getAll(),
           context,
           new ParDoTranslationHelper.DoFnOperatorFactory<
-              KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>() {
+              KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>() {
             @Override
-            public DoFnOperator<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>
-                createDoFnOperator(
-                    DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> doFn,
+            public DoFnOperator<
+                KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
+                OutputT> createDoFnOperator(
+                    DoFn<
+                        KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
+                        OutputT> doFn,
                     String stepName,
                     List<PCollectionView<?>> sideInputs,
                     TupleTag<OutputT> mainOutputTag,
@@ -559,8 +558,11 @@ class FlinkStreamingTransformTranslators {
                     FlinkStreamingTranslationContext context,
                     WindowingStrategy<?, ?> windowingStrategy,
                     Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags,
-                    Coder<WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>>>
-                        inputCoder,
+                    Coder<
+                        WindowedValue<
+                            KeyedWorkItem<
+                                String,
+                                ElementAndRestriction<InputT, RestrictionT>>>> inputCoder,
                     Coder keyCoder,
                     Map<Integer, PCollectionView<?>> transformedSideInputs) {
               return new SplittableDoFnOperator<>(
@@ -582,17 +584,17 @@ class FlinkStreamingTransformTranslators {
 
   private static class CreateViewStreamingTranslator<ElemT, ViewT>
       extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
-      CreateStreamingFlinkView.CreateFlinkPCollectionView<ElemT, ViewT>> {
+      FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT>> {
 
     @Override
     public void translateNode(
-        CreateStreamingFlinkView.CreateFlinkPCollectionView<ElemT, ViewT> transform,
+        FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT> transform,
         FlinkStreamingTranslationContext context) {
       // just forward
       DataStream<WindowedValue<List<ElemT>>> inputDataSet =
           context.getInputDataStream(context.getInput(transform));
 
-      PCollectionView<ViewT> view = transform.getView();
+      PCollectionView<ViewT> view = context.getOutput(transform);
 
       context.setOutputDataStream(view, inputDataSet);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
index 74a5fb9..ea5f6b3 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import com.google.common.collect.Iterables;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -114,7 +113,7 @@ class FlinkStreamingTranslationContext {
 
   @SuppressWarnings("unchecked")
   public <T extends PValue> T getInput(PTransform<T, ?> transform) {
-    return (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform));
+    return (T) Iterables.getOnlyElement(currentTransform.getInputs().values());
   }
 
   public <T extends PInput> Map<TupleTag<?>, PValue> getInputs(PTransform<T, ?> transform) {

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
new file mode 100644
index 0000000..ce1c895
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
+
+/**
+ * Flink streaming overrides for various view (side input) transforms.
+ */
+class FlinkStreamingViewOverrides {
+
+  /**
+   * Specialized implementation for
+   * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
+   * for the Flink runner in streaming mode.
+   */
+  static class StreamingViewAsMap<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+
+    private final transient FlinkRunner runner;
+
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsMap(FlinkRunner runner, View.AsMap<K, V> transform) {
+      this.runner = runner;
+    }
+
+    @Override
+    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
+      PCollectionView<Map<K, V>> view =
+          PCollectionViews.mapView(
+              input,
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+      try {
+        inputCoder.getKeyCoder().verifyDeterministic();
+      } catch (Coder.NonDeterministicException e) {
+        runner.recordViewUsesNonDeterministicKeyCoder(this);
+      }
+
+      return input
+          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+          .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, V>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsMap";
+    }
+  }
+
+  /**
+   * Specialized expansion for {@link
+   * View.AsMultimap View.AsMultimap} for the
+   * Flink runner in streaming mode.
+   */
+  static class StreamingViewAsMultimap<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
+
+    private final transient FlinkRunner runner;
+
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsMultimap(FlinkRunner runner, View.AsMultimap<K, V> transform) {
+      this.runner = runner;
+    }
+
+    @Override
+    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+      PCollectionView<Map<K, Iterable<V>>> view =
+          PCollectionViews.multimapView(
+              input,
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+      try {
+        inputCoder.getKeyCoder().verifyDeterministic();
+      } catch (Coder.NonDeterministicException e) {
+        runner.recordViewUsesNonDeterministicKeyCoder(this);
+      }
+
+      return input
+          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+          .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsMultimap";
+    }
+  }
+
+  /**
+   * Specialized implementation for
+   * {@link View.AsList View.AsList} for the
+   * Flink runner in streaming mode.
+   */
+  static class StreamingViewAsList<T>
+      extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsList(FlinkRunner runner, View.AsList<T> transform) {}
+
+    @Override
+    public PCollectionView<List<T>> expand(PCollection<T> input) {
+      PCollectionView<List<T>> view =
+          PCollectionViews.listView(
+              input,
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+          .apply(CreateFlinkPCollectionView.<T, List<T>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsList";
+    }
+  }
+
+  /**
+   * Specialized implementation for
+   * {@link View.AsIterable View.AsIterable} for the
+   * Flink runner in streaming mode.
+   */
+  static class StreamingViewAsIterable<T>
+      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsIterable(FlinkRunner runner, View.AsIterable<T> transform) { }
+
+    @Override
+    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
+      PCollectionView<Iterable<T>> view =
+          PCollectionViews.iterableView(
+              input,
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+          .apply(CreateFlinkPCollectionView.<T, Iterable<T>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsIterable";
+    }
+  }
+
+  /**
+   * Specialized expansion for
+   * {@link View.AsSingleton View.AsSingleton} for the
+   * Flink runner in streaming mode.
+   */
+  static class StreamingViewAsSingleton<T>
+      extends PTransform<PCollection<T>, PCollectionView<T>> {
+    private View.AsSingleton<T> transform;
+
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsSingleton(FlinkRunner runner, View.AsSingleton<T> transform) {
+      this.transform = transform;
+    }
+
+    @Override
+    public PCollectionView<T> expand(PCollection<T> input) {
+      Combine.Globally<T, T> combine = Combine.globally(
+          new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
+      if (!transform.hasDefaultValue()) {
+        combine = combine.withoutDefaults();
+      }
+      return input.apply(combine.asSingletonView());
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsSingleton";
+    }
+
+    private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
+      private boolean hasDefaultValue;
+      private T defaultValue;
+
+      SingletonCombine(boolean hasDefaultValue, T defaultValue) {
+        this.hasDefaultValue = hasDefaultValue;
+        this.defaultValue = defaultValue;
+      }
+
+      @Override
+      public T apply(T left, T right) {
+        throw new IllegalArgumentException("PCollection with more than one element "
+            + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
+            + "combine the PCollection into a single value");
+      }
+
+      @Override
+      public T identity() {
+        if (hasDefaultValue) {
+          return defaultValue;
+        } else {
+          throw new IllegalArgumentException(
+              "Empty PCollection accessed as a singleton view. "
+                  + "Consider setting withDefault to provide a default value");
+        }
+      }
+    }
+  }
+
+  static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
+      extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
+    Combine.GloballyAsSingletonView<InputT, OutputT> transform;
+
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingCombineGloballyAsSingletonView(
+        FlinkRunner runner,
+        Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
+      this.transform = transform;
+    }
+
+    @Override
+    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
+      PCollection<OutputT> combined =
+          input.apply(Combine.globally(transform.getCombineFn())
+              .withoutDefaults()
+              .withFanout(transform.getFanout()));
+
+      PCollectionView<OutputT> view = PCollectionViews.singletonView(
+          combined,
+          combined.getWindowingStrategy(),
+          transform.getInsertDefault(),
+          transform.getInsertDefault()
+              ? transform.getCombineFn().defaultValue() : null,
+          combined.getCoder());
+      return combined
+          .apply(ParDo.of(new WrapAsList<OutputT>()))
+          .apply(CreateFlinkPCollectionView.<OutputT, OutputT>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingCombineGloballyAsSingletonView";
+    }
+  }
+
+  private static class WrapAsList<T> extends DoFn<T, List<T>> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(Collections.singletonList(c.element()));
+    }
+  }
+
+  /**
+   * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
+   *
+   * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
+   * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
+   * They require the input {@link PCollection} fits in memory.
+   * For a large {@link PCollection} this is expected to crash!
+   *
+   * @param <T> the type of elements to concatenate.
+   */
+  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
+    @Override
+    public List<T> createAccumulator() {
+      return new ArrayList<T>();
+    }
+
+    @Override
+    public List<T> addInput(List<T> accumulator, T input) {
+      accumulator.add(input);
+      return accumulator;
+    }
+
+    @Override
+    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+      List<T> result = createAccumulator();
+      for (List<T> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
+
+    @Override
+    public List<T> extractOutput(List<T> accumulator) {
+      return accumulator;
+    }
+
+    @Override
+    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+
+    @Override
+    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+  }
+
+  /**
+   * Creates a primitive {@link PCollectionView}.
+   *
+   * <p>For internal use only by runner implementors.
+   *
+   * @param <ElemT> The type of the elements of the input PCollection
+   * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
+   */
+  public static class CreateFlinkPCollectionView<ElemT, ViewT>
+      extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
+    private PCollectionView<ViewT> view;
+
+    private CreateFlinkPCollectionView(PCollectionView<ViewT> view) {
+      this.view = view;
+    }
+
+    public static <ElemT, ViewT> CreateFlinkPCollectionView<ElemT, ViewT> of(
+        PCollectionView<ViewT> view) {
+      return new CreateFlinkPCollectionView<>(view);
+    }
+
+    @Override
+    public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
+      return view;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
deleted file mode 100644
index 1dc8de9..0000000
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.common.collect.ImmutableList;
-import java.util.List;
-import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
-import org.apache.beam.runners.core.construction.PTransformMatchers;
-import org.apache.beam.runners.core.construction.SplittableParDo;
-import org.apache.beam.sdk.runners.PTransformOverride;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.View;
-
-/**
- * {@link PTransform} overrides for Flink runner.
- */
-public class FlinkTransformOverrides {
-  public static List<PTransformOverride> getDefaultOverrides(boolean streaming) {
-    if (streaming) {
-      return ImmutableList.<PTransformOverride>builder()
-          .add(
-              PTransformOverride.of(
-                  PTransformMatchers.splittableParDoMulti(),
-                  new FlinkStreamingPipelineTranslator.SplittableParDoOverrideFactory()))
-          .add(
-              PTransformOverride.of(
-                  PTransformMatchers.classEqualTo(SplittableParDo.ProcessKeyedElements.class),
-                  new SplittableParDoViaKeyedWorkItems.OverrideFactory()))
-          .add(
-              PTransformOverride.of(
-                  PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
-                  new CreateStreamingFlinkView.Factory()))
-          .build();
-    } else {
-      return ImmutableList.of();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index 2f095d4..5d08eba 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -35,6 +35,7 @@ import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.runners.core.construction.ElementAndRestriction;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -42,7 +43,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -55,15 +55,18 @@ import org.joda.time.Instant;
  * the {@code @ProcessElement} method of a splittable {@link DoFn}.
  */
 public class SplittableDoFnOperator<
-        InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
-    extends DoFnOperator<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> {
+    InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
+    extends DoFnOperator<
+    KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> {
 
   private transient ScheduledExecutorService executorService;
 
   public SplittableDoFnOperator(
-      DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> doFn,
+      DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> doFn,
       String stepName,
-      Coder<WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>>> inputCoder,
+      Coder<
+          WindowedValue<
+              KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>> inputCoder,
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> additionalOutputTags,
       OutputManagerFactory<OutputT> outputManagerFactory,
@@ -84,6 +87,7 @@ public class SplittableDoFnOperator<
         sideInputs,
         options,
         keyCoder);
+
   }
 
   @Override
@@ -147,7 +151,7 @@ public class SplittableDoFnOperator<
   @Override
   public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
     doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
-        KeyedWorkItems.<String, KV<InputT, RestrictionT>>timersWorkItem(
+        KeyedWorkItems.<String, ElementAndRestriction<InputT, RestrictionT>>timersWorkItem(
             (String) stateInternals.getKey(),
             Collections.singletonList(timer.getNamespace()))));
   }