You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/06 16:40:56 UTC

[06/50] [abbrv] incubator-beam git commit: Improve Splittable DoFn

Improve Splittable DoFn

Makes Splittable DoFn be more like a real DoFn:
- Adds support for side inputs and outputs to SDF
- Teaches `ProcessFn` to work with exploded windows inside the
  `KeyedWorkItem`. It works with them by un-exploding the windows
  in the `Iterable<WindowedValue<ElementAndRestriction>>` into a
  single `WindowedValue`, since the values and timestamps are
  guaranteed to be the same.

Makes SplittableParDo.ProcessFn not use the (now unavailable)
OldDoFn state and timers API:
- Makes `ProcessFn` be a primitive transform with its own
  `ParDoEvaluator`. As a nice side effect, this enables the runner to
  provide additional hooks into it - e.g. for giving the runner access
  to the restriction tracker (in later PRs)
- For consistency, moves declaration of `GBKIntoKeyedWorkItems`
  primitive transform into `SplittableParDo`, alongside the
  `SplittableProcessElements` transform
- Preserves compressed representation of `WindowedValue`'s in
  `PushbackSideInputDoFnRunner`
- Uses OutputWindowedValue in SplittableParDo.ProcessFn

Proper lifecycle management for wrapped fn.

- Caches underlying fn using DoFnLifecycleManager, so its
  @Setup and @Teardown methods are called.
- Calls @StartBundle and @FinishBundle methods on the underlying
  fn explicitly. Output from them is prohibited, since an SDF
  is only allowed to output after a successful RestrictionTracker.tryClaim.
  It's possible that an SDF should not be allowed to have
  StartBundle/FinishBundle methods at all, but I'm not sure.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/87ff5ac3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/87ff5ac3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/87ff5ac3

Branch: refs/heads/gearpump-runner
Commit: 87ff5ac36bb9cc62fa4864ffa7b5a5e495b9a4a1
Parents: fd4b631
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Oct 26 16:05:01 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Dec 1 14:15:55 2016 -0800

----------------------------------------------------------------------
 .../core/ElementAndRestrictionCoder.java        |   8 +
 .../runners/core/GBKIntoKeyedWorkItems.java     |  55 ---
 .../beam/runners/core/SplittableParDo.java      | 378 +++++++++++++++----
 .../beam/runners/core/SplittableParDoTest.java  | 134 +++++--
 ...ectGBKIntoKeyedWorkItemsOverrideFactory.java |  41 +-
 .../beam/runners/direct/DirectGroupByKey.java   |   2 +-
 .../beam/runners/direct/DirectRunner.java       |   8 +-
 .../runners/direct/DoFnLifecycleManager.java    |   4 +-
 .../beam/runners/direct/ParDoEvaluator.java     |  26 +-
 .../runners/direct/ParDoEvaluatorFactory.java   |  63 +++-
 .../direct/ParDoMultiOverrideFactory.java       |   2 +-
 ...littableProcessElementsEvaluatorFactory.java | 144 +++++++
 .../direct/TransformEvaluatorRegistry.java      |   5 +
 .../beam/runners/direct/SplittableDoFnTest.java | 194 +++++++++-
 .../org/apache/beam/sdk/transforms/DoFn.java    |  12 +
 .../apache/beam/sdk/transforms/DoFnTester.java  |  51 ++-
 .../sdk/util/state/TimerInternalsFactory.java   |  36 ++
 17 files changed, 905 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
index 6dec8e2..64c1e14 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
@@ -64,4 +64,12 @@ public class ElementAndRestrictionCoder<ElementT, RestrictionT>
     RestrictionT value = restrictionCoder.decode(inStream, context);
     return ElementAndRestriction.of(key, value);
   }
+
+  public Coder<ElementT> getElementCoder() {
+    return elementCoder;
+  }
+
+  public Coder<RestrictionT> getRestrictionCoder() {
+    return restrictionCoder;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
deleted file mode 100644
index 304e349..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItemCoder;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * Interface for creating a runner-specific {@link GroupByKey GroupByKey-like} {@link PTransform}
- * that produces {@link KeyedWorkItem KeyedWorkItems} so that downstream transforms can access state
- * and timers.
- */
-@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
-public class GBKIntoKeyedWorkItems<KeyT, InputT>
-    extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
-  @Override
-  public PCollection<KeyedWorkItem<KeyT, InputT>> apply(PCollection<KV<KeyT, InputT>> input) {
-    checkArgument(input.getCoder() instanceof KvCoder,
-        "Expected input coder to be KvCoder, but was %s",
-        input.getCoder().getClass().getSimpleName());
-
-    KvCoder<KeyT, InputT> kvCoder = (KvCoder<KeyT, InputT>) input.getCoder();
-    Coder<KeyedWorkItem<KeyT, InputT>> coder = KeyedWorkItemCoder.of(
-        kvCoder.getKeyCoder(), kvCoder.getValueCoder(),
-        input.getWindowingStrategy().getWindowFn().windowCoder());
-    PCollection<KeyedWorkItem<KeyT, InputT>> collection = PCollection.createPrimitiveOutputInternal(
-        input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
-    collection.setCoder((Coder) coder);
-    return collection;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index c38ab2f..80fd17b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -19,17 +19,22 @@ package org.apache.beam.runners.core;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import java.util.List;
 import java.util.UUID;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -45,21 +50,30 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItemCoder;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.TimerInternalsFactory;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypedPValue;
 import org.joda.time.Instant;
 
 /**
@@ -80,31 +94,53 @@ import org.joda.time.Instant;
  * ParDo.of(splittable DoFn)}, but not for direct use by pipeline writers.
  */
 @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
-public class SplittableParDo<
-        InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
-    extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
-  private final DoFn<InputT, OutputT> fn;
-  private final DoFnSignature signature;
+public class SplittableParDo<InputT, OutputT, RestrictionT>
+    extends PTransform<PCollection<InputT>, PCollectionTuple> {
+  private final ParDo.BoundMulti<InputT, OutputT> parDo;
 
   /**
-   * Creates the transform for the given original {@link ParDo} and {@link DoFn}.
+   * Creates the transform for the given original multi-output {@link ParDo}.
    *
-   * @param fn The splittable {@link DoFn} inside the original {@link ParDo} transform.
+   * @param parDo The splittable {@link ParDo} transform.
    */
-  public SplittableParDo(DoFn<InputT, OutputT> fn) {
-    checkNotNull(fn, "fn must not be null");
-    this.fn = fn;
-    this.signature = DoFnSignatures.getSignature(fn.getClass());
-    checkArgument(signature.processElement().isSplittable(), "fn must be a splittable DoFn");
+  public SplittableParDo(ParDo.BoundMulti<InputT, OutputT> parDo) {
+    checkNotNull(parDo, "parDo must not be null");
+    this.parDo = parDo;
+    checkArgument(
+        DoFnSignatures.getSignature(parDo.getNewFn().getClass()).processElement().isSplittable(),
+        "fn must be a splittable DoFn");
   }
 
   @Override
-  public PCollection<OutputT> apply(PCollection<InputT> input) {
-    PCollection.IsBounded isFnBounded = signature.isBoundedPerElement();
+  public PCollectionTuple apply(PCollection<InputT> input) {
+    return applyTyped(input);
+  }
+
+  private PCollectionTuple applyTyped(PCollection<InputT> input) {
+    DoFn<InputT, OutputT> fn = parDo.getNewFn();
     Coder<RestrictionT> restrictionCoder =
-        DoFnInvokers
-            .invokerFor(fn)
+        DoFnInvokers.invokerFor(fn)
             .invokeGetRestrictionCoder(input.getPipeline().getCoderRegistry());
+    PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> keyedWorkItems =
+        applySplitIntoKeyedWorkItems(input, fn, restrictionCoder);
+    return keyedWorkItems.apply(
+        "Process",
+        new ProcessElements<>(
+            fn,
+            input.getCoder(),
+            restrictionCoder,
+            input.getWindowingStrategy(),
+            parDo.getSideInputs(),
+            parDo.getMainOutputTag(),
+            parDo.getSideOutputTags()));
+  }
+
+  private static <InputT, OutputT, RestrictionT>
+      PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
+          applySplitIntoKeyedWorkItems(
+              PCollection<InputT> input,
+              DoFn<InputT, OutputT> fn,
+              Coder<RestrictionT> restrictionCoder) {
     Coder<ElementAndRestriction<InputT, RestrictionT>> splitCoder =
         ElementAndRestrictionCoder.of(input.getCoder(), restrictionCoder);
 
@@ -121,23 +157,133 @@ public class SplittableParDo<
                 WithKeys.of(new RandomUniqueKeyFn<ElementAndRestriction<InputT, RestrictionT>>()))
             .apply(
                 "Group by key",
-                new GBKIntoKeyedWorkItems<String, ElementAndRestriction<InputT, RestrictionT>>());
+                new GBKIntoKeyedWorkItems<String, ElementAndRestriction<InputT, RestrictionT>>())
+            .setCoder(
+                KeyedWorkItemCoder.of(
+                    StringUtf8Coder.of(),
+                    splitCoder,
+                    input.getWindowingStrategy().getWindowFn().windowCoder()));
     checkArgument(
         keyedWorkItems.getWindowingStrategy().getWindowFn() instanceof GlobalWindows,
         "GBKIntoKeyedWorkItems must produce a globally windowed collection, "
             + "but windowing strategy was: %s",
         keyedWorkItems.getWindowingStrategy());
-    return keyedWorkItems
-        .apply(
-            "Process",
-            ParDo.of(
-                new ProcessFn<InputT, OutputT, RestrictionT, TrackerT>(
-                    fn,
-                    input.getCoder(),
-                    restrictionCoder,
-                    input.getWindowingStrategy().getWindowFn().windowCoder())))
-        .setIsBoundedInternal(input.isBounded().and(isFnBounded))
-        .setWindowingStrategyInternal(input.getWindowingStrategy());
+    return keyedWorkItems;
+  }
+
+  /**
+   * Runner-specific primitive {@link GroupByKey GroupByKey-like} {@link PTransform} that produces
+   * {@link KeyedWorkItem KeyedWorkItems} so that downstream transforms can access state and timers.
+   *
+   * <p>Unlike a real {@link GroupByKey}, ignores the input's windowing and triggering strategy and
+   * emits output immediately.
+   */
+  public static class GBKIntoKeyedWorkItems<KeyT, InputT>
+      extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
+    @Override
+    public PCollection<KeyedWorkItem<KeyT, InputT>> apply(PCollection<KV<KeyT, InputT>> input) {
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded());
+    }
+  }
+
+  /**
+   * Runner-specific primitive {@link PTransform} that invokes the {@link DoFn.ProcessElement}
+   * method for a splittable {@link DoFn}.
+   */
+  public static class ProcessElements<InputT, OutputT, RestrictionT>
+      extends PTransform<
+          PCollection<? extends KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>,
+          PCollectionTuple> {
+    private final DoFn<InputT, OutputT> fn;
+    private final Coder<InputT> elementCoder;
+    private final Coder<RestrictionT> restrictionCoder;
+    private final WindowingStrategy<?, ?> windowingStrategy;
+    private final List<PCollectionView<?>> sideInputs;
+    private final TupleTag<OutputT> mainOutputTag;
+    private final TupleTagList sideOutputTags;
+
+    /**
+     * @param fn the splittable {@link DoFn}.
+     * @param windowingStrategy the {@link WindowingStrategy} of the input collection.
+     * @param sideInputs list of side inputs that should be available to the {@link DoFn}.
+     * @param mainOutputTag {@link TupleTag Tag} of the {@link DoFn DoFn's} main output.
+     * @param sideOutputTags {@link TupleTagList Tags} of the {@link DoFn DoFn's} side outputs.
+     */
+    public ProcessElements(
+        DoFn<InputT, OutputT> fn,
+        Coder<InputT> elementCoder,
+        Coder<RestrictionT> restrictionCoder,
+        WindowingStrategy<?, ?> windowingStrategy,
+        List<PCollectionView<?>> sideInputs,
+        TupleTag<OutputT> mainOutputTag,
+        TupleTagList sideOutputTags) {
+      this.fn = fn;
+      this.elementCoder = elementCoder;
+      this.restrictionCoder = restrictionCoder;
+      this.windowingStrategy = windowingStrategy;
+      this.sideInputs = sideInputs;
+      this.mainOutputTag = mainOutputTag;
+      this.sideOutputTags = sideOutputTags;
+    }
+
+    public DoFn<InputT, OutputT> getFn() {
+      return fn;
+    }
+
+    public List<PCollectionView<?>> getSideInputs() {
+      return sideInputs;
+    }
+
+    public TupleTag<OutputT> getMainOutputTag() {
+      return mainOutputTag;
+    }
+
+    public TupleTagList getSideOutputTags() {
+      return sideOutputTags;
+    }
+
+    public ProcessFn<InputT, OutputT, RestrictionT, ?> newProcessFn(DoFn<InputT, OutputT> fn) {
+      return new SplittableParDo.ProcessFn<>(
+          fn, elementCoder, restrictionCoder, windowingStrategy.getWindowFn().windowCoder());
+    }
+
+    @Override
+    public PCollectionTuple apply(
+        PCollection<? extends KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
+            input) {
+      DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
+      PCollectionTuple outputs =
+          PCollectionTuple.ofPrimitiveOutputsInternal(
+              input.getPipeline(),
+              TupleTagList.of(mainOutputTag).and(sideOutputTags.getAll()),
+              windowingStrategy,
+              input.isBounded().and(signature.isBoundedPerElement()));
+
+      // Set output type descriptor similarly to how ParDo.BoundMulti does it.
+      outputs.get(mainOutputTag).setTypeDescriptorInternal(fn.getOutputTypeDescriptor());
+
+      return outputs;
+    }
+
+    @Override
+    public <T> Coder<T> getDefaultOutputCoder(
+        PCollection<? extends KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
+            input,
+        TypedPValue<T> output)
+        throws CannotProvideCoderException {
+      // Similar logic to ParDo.BoundMulti.getDefaultOutputCoder.
+      @SuppressWarnings("unchecked")
+      KeyedWorkItemCoder<String, ElementAndRestriction<InputT, RestrictionT>> kwiCoder =
+          (KeyedWorkItemCoder) input.getCoder();
+      Coder<InputT> inputCoder =
+          ((ElementAndRestrictionCoder<InputT, RestrictionT>) kwiCoder.getElementCoder())
+              .getElementCoder();
+      return input
+          .getPipeline()
+          .getCoderRegistry()
+          .getDefaultCoder(output.getTypeDescriptor(), fn.getInputTypeDescriptor(), inputCoder);
+    }
   }
 
   /**
@@ -182,15 +328,11 @@ public class SplittableParDo<
    * The heart of splittable {@link DoFn} execution: processes a single (element, restriction) pair
    * by creating a tracker for the restriction and checkpointing/resuming processing later if
    * necessary.
-   *
-   * <p>TODO: This uses deprecated OldDoFn since DoFn does not provide access to state/timer
-   * internals. This should be rewritten to use the <a href="https://s.apache.org/beam-state">State
-   * and Timers API</a> once it is available.
    */
   @VisibleForTesting
-  static class ProcessFn<
+  public static class ProcessFn<
           InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
-      extends OldDoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> {
+      extends DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> {
     // Commit at least once every 10k output records.  This keeps the watermark advancing
     // smoothly, and ensures that not too much work will have to be reprocessed in the event of
     // a crash.
@@ -227,30 +369,56 @@ public class SplittableParDo<
      */
     private StateTag<Object, ValueState<RestrictionT>> restrictionTag;
 
+    private transient StateInternalsFactory<String> stateInternalsFactory;
+    private transient TimerInternalsFactory<String> timerInternalsFactory;
+    private transient OutputWindowedValue<OutputT> outputWindowedValue;
+
     private final DoFn<InputT, OutputT> fn;
     private final Coder<? extends BoundedWindow> windowCoder;
 
     private transient DoFnInvoker<InputT, OutputT> invoker;
 
-    ProcessFn(
+    public ProcessFn(
         DoFn<InputT, OutputT> fn,
         Coder<InputT> elementCoder,
         Coder<RestrictionT> restrictionCoder,
         Coder<? extends BoundedWindow> windowCoder) {
       this.fn = fn;
+      this.invoker = DoFnInvokers.invokerFor(fn);
       this.windowCoder = windowCoder;
-      elementTag =
+      this.elementTag =
           StateTags.value("element", WindowedValue.getFullCoder(elementCoder, this.windowCoder));
-      restrictionTag = StateTags.value("restriction", restrictionCoder);
+      this.restrictionTag = StateTags.value("restriction", restrictionCoder);
     }
 
-    @Override
-    public void setup() throws Exception {
-      invoker = DoFnInvokers.invokerFor(fn);
+    public void setStateInternalsFactory(StateInternalsFactory<String> stateInternalsFactory) {
+      this.stateInternalsFactory = stateInternalsFactory;
     }
 
-    @Override
+    public void setTimerInternalsFactory(TimerInternalsFactory<String> timerInternalsFactory) {
+      this.timerInternalsFactory = timerInternalsFactory;
+    }
+
+    public void setOutputWindowedValue(OutputWindowedValue<OutputT> outputWindowedValue) {
+      this.outputWindowedValue = outputWindowedValue;
+    }
+
+    @StartBundle
+    public void startBundle(Context c) throws Exception {
+      invoker.invokeStartBundle(wrapContext(c));
+    }
+
+    @FinishBundle
+    public void finishBundle(Context c) throws Exception {
+      invoker.invokeFinishBundle(wrapContext(c));
+    }
+
+    @ProcessElement
     public void processElement(final ProcessContext c) {
+      StateInternals<String> stateInternals =
+          stateInternalsFactory.stateInternalsForKey(c.element().key());
+      TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(c.element().key());
+
       // Initialize state (element and restriction) depending on whether this is the seed call.
       // The seed call is the first call for this element, which actually has the element.
       // Subsequent calls are timer firings and the element has to be retrieved from the state.
@@ -258,17 +426,23 @@ public class SplittableParDo<
       boolean isSeedCall = (timer == null);
       StateNamespace stateNamespace = isSeedCall ? StateNamespaces.global() : timer.getNamespace();
       ValueState<WindowedValue<InputT>> elementState =
-          c.windowingInternals().stateInternals().state(stateNamespace, elementTag);
+          stateInternals.state(stateNamespace, elementTag);
       ValueState<RestrictionT> restrictionState =
-          c.windowingInternals().stateInternals().state(stateNamespace, restrictionTag);
+          stateInternals.state(stateNamespace, restrictionTag);
       WatermarkHoldState<GlobalWindow> holdState =
-          c.windowingInternals().stateInternals().state(stateNamespace, watermarkHoldTag);
+          stateInternals.state(stateNamespace, watermarkHoldTag);
 
       ElementAndRestriction<WindowedValue<InputT>, RestrictionT> elementAndRestriction;
       if (isSeedCall) {
         // The element and restriction are available in c.element().
+        // elementsIterable() will, by construction of SplittableParDo, contain the same value
+        // potentially in several different windows. We implode this into a single WindowedValue
+        // in order to simplify the rest of the code and avoid iterating over elementsIterable()
+        // explicitly. The windows of this WindowedValue will be propagated to windows of the
+        // output. This is correct because a splittable DoFn is not allowed to inspect the window
+        // of its element.
         WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue =
-            Iterables.getOnlyElement(c.element().elementsIterable());
+            implodeWindows(c.element().elementsIterable());
         WindowedValue<InputT> element = windowedValue.withValue(windowedValue.getValue().element());
         elementState.write(element);
         elementAndRestriction =
@@ -290,7 +464,7 @@ public class SplittableParDo<
       DoFn.ProcessContinuation cont =
           invoker.invokeProcessElement(
               wrapTracker(
-                  tracker, makeContext(c, elementAndRestriction.element(), tracker, residual)));
+                  tracker, wrapContext(c, elementAndRestriction.element(), tracker, residual)));
       if (residual[0] == null) {
         // This means the call completed unsolicited, and the context produced by makeContext()
         // did not take a checkpoint. Take one now.
@@ -307,19 +481,85 @@ public class SplittableParDo<
       }
       restrictionState.write(residual[0]);
       Instant futureOutputWatermark = cont.getWatermark();
-      if (futureOutputWatermark != null) {
-        holdState.add(futureOutputWatermark);
+      if (futureOutputWatermark == null) {
+        futureOutputWatermark = elementAndRestriction.element().getTimestamp();
       }
+      Instant wakeupTime = timerInternals.currentProcessingTime().plus(cont.resumeDelay());
+      holdState.add(futureOutputWatermark);
       // Set a timer to continue processing this element.
-      TimerInternals timerInternals = c.windowingInternals().timerInternals();
       timerInternals.setTimer(
-          TimerInternals.TimerData.of(
-              stateNamespace,
-              timerInternals.currentProcessingTime().plus(cont.resumeDelay()),
-              TimeDomain.PROCESSING_TIME));
+          TimerInternals.TimerData.of(stateNamespace, wakeupTime, TimeDomain.PROCESSING_TIME));
+    }
+
+    /**
+     * Does the opposite of {@link WindowedValue#explodeWindows()} - creates a single {@link
+     * WindowedValue} from a collection of {@link WindowedValue}'s that is known to contain copies
+     * of the same value with the same timestamp, but different window sets.
+     *
+     * <p>This is only legal to do because we know that {@link RandomUniqueKeyFn} created unique
+     * keys for every {@link ElementAndRestriction}, so if there's multiple {@link WindowedValue}'s
+     * for the same key, that means only that the windows of that {@link ElementAndRestriction} are
+     * being delivered separately rather than all at once. It is also legal to do because splittable
+     * {@link DoFn} is not allowed to access the window of its element, so we can propagate the full
+     * set of windows of its input to its output.
+     */
+    private static <InputT, RestrictionT>
+        WindowedValue<ElementAndRestriction<InputT, RestrictionT>> implodeWindows(
+            Iterable<WindowedValue<ElementAndRestriction<InputT, RestrictionT>>> values) {
+      WindowedValue<ElementAndRestriction<InputT, RestrictionT>> first =
+          Iterables.getFirst(values, null);
+      checkState(first != null, "Got a KeyedWorkItem with no elements and no timers");
+      ImmutableList.Builder<BoundedWindow> windows = ImmutableList.builder();
+      for (WindowedValue<ElementAndRestriction<InputT, RestrictionT>> value : values) {
+        windows.addAll(value.getWindows());
+      }
+      return WindowedValue.of(
+          first.getValue(), first.getTimestamp(), windows.build(), first.getPane());
+    }
+
+    private DoFn<InputT, OutputT>.Context wrapContext(final Context baseContext) {
+      return fn.new Context() {
+        @Override
+        public PipelineOptions getPipelineOptions() {
+          return baseContext.getPipelineOptions();
+        }
+
+        @Override
+        public void output(OutputT output) {
+          throwUnsupportedOutput();
+        }
+
+        @Override
+        public void outputWithTimestamp(OutputT output, Instant timestamp) {
+          throwUnsupportedOutput();
+        }
+
+        @Override
+        public <T> void sideOutput(TupleTag<T> tag, T output) {
+          throwUnsupportedOutput();
+        }
+
+        @Override
+        public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+          throwUnsupportedOutput();
+        }
+
+        @Override
+        protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+            String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
+          return fn.createAggregator(name, combiner);
+        }
+
+        private void throwUnsupportedOutput() {
+          throw new UnsupportedOperationException(
+              String.format(
+                  "Splittable DoFn can only output from @%s",
+                  ProcessElement.class.getSimpleName()));
+        }
+      };
     }
 
-    private DoFn<InputT, OutputT>.ProcessContext makeContext(
+    private DoFn<InputT, OutputT>.ProcessContext wrapContext(
         final ProcessContext baseContext,
         final WindowedValue<InputT> element,
         final TrackerT tracker,
@@ -340,17 +580,14 @@ public class SplittableParDo<
         }
 
         public void output(OutputT output) {
-          baseContext
-              .windowingInternals()
-              .outputWindowedValue(
-                  output, element.getTimestamp(), element.getWindows(), element.getPane());
+          outputWindowedValue.outputWindowedValue(
+              output, element.getTimestamp(), element.getWindows(), element.getPane());
           noteOutput();
         }
 
         public void outputWithTimestamp(OutputT output, Instant timestamp) {
-          baseContext
-              .windowingInternals()
-              .outputWindowedValue(output, timestamp, element.getWindows(), element.getPane());
+          outputWindowedValue.outputWindowedValue(
+              output, timestamp, element.getWindows(), element.getPane());
           noteOutput();
         }
 
@@ -370,17 +607,15 @@ public class SplittableParDo<
         }
 
         public <T> void sideOutput(TupleTag<T> tag, T output) {
-          // TODO: I'm not sure how to implement this correctly: there's no
-          // "internals.sideOutputWindowedValue".
-          throw new UnsupportedOperationException(
-              "Side outputs not yet supported by splittable DoFn");
+          outputWindowedValue.sideOutputWindowedValue(
+              tag, output, element.getTimestamp(), element.getWindows(), element.getPane());
+          noteOutput();
         }
 
         public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-          // TODO: I'm not sure how to implement this correctly: there's no
-          // "internals.sideOutputWindowedValue".
-          throw new UnsupportedOperationException(
-              "Side outputs not yet supported by splittable DoFn");
+          outputWindowedValue.sideOutputWindowedValue(
+              tag, output, timestamp, element.getWindows(), element.getPane());
+          noteOutput();
         }
 
         @Override
@@ -393,8 +628,7 @@ public class SplittableParDo<
 
     /**
      * Creates an {@link DoFnInvoker.ArgumentProvider} that provides the given tracker as well as
-     * the given
-     * {@link ProcessContext} (which is also provided when a {@link Context} is requested.
+     * the given {@link ProcessContext} (which is also provided when a {@link Context} is requested.
      */
     private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapTracker(
         TrackerT tracker, DoFn<InputT, OutputT>.ProcessContext processContext) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 29ff838..990d892 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -29,6 +29,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
@@ -38,6 +39,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -47,8 +49,13 @@ import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.KeyedWorkItems;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.util.state.TimerInternalsFactory;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Test;
@@ -120,6 +127,12 @@ public class SplittableParDoTest {
         .setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
   }
 
+  private static final TupleTag<String> MAIN_OUTPUT_TAG = new TupleTag<String>() {};
+
+  private ParDo.BoundMulti<Integer, String> makeParDo(DoFn<Integer, String> fn) {
+    return ParDo.of(fn).withOutputTags(MAIN_OUTPUT_TAG, TupleTagList.empty());
+  }
+
   @Test
   public void testBoundednessForBoundedFn() {
     Pipeline pipeline = TestPipeline.create();
@@ -128,14 +141,15 @@ public class SplittableParDoTest {
         "Applying a bounded SDF to a bounded collection produces a bounded collection",
         PCollection.IsBounded.BOUNDED,
         makeBoundedCollection(pipeline)
-            .apply("bounded to bounded", new SplittableParDo<>(boundedFn))
-            .isBounded());
+            .apply("bounded to bounded", new SplittableParDo<>(makeParDo(boundedFn)))
+            .get(MAIN_OUTPUT_TAG).isBounded());
     assertEquals(
         "Applying a bounded SDF to an unbounded collection produces an unbounded collection",
         PCollection.IsBounded.UNBOUNDED,
         makeUnboundedCollection(pipeline)
-            .apply("bounded to unbounded", new SplittableParDo<>(boundedFn))
-            .isBounded());
+            .apply(
+                "bounded to unbounded", new SplittableParDo<>(makeParDo(boundedFn)))
+            .get(MAIN_OUTPUT_TAG).isBounded());
   }
 
   @Test
@@ -146,18 +160,27 @@ public class SplittableParDoTest {
         "Applying an unbounded SDF to a bounded collection produces a bounded collection",
         PCollection.IsBounded.UNBOUNDED,
         makeBoundedCollection(pipeline)
-            .apply("unbounded to bounded", new SplittableParDo<>(unboundedFn))
-            .isBounded());
+            .apply(
+                "unbounded to bounded",
+                new SplittableParDo<>(makeParDo(unboundedFn)))
+            .get(MAIN_OUTPUT_TAG).isBounded());
     assertEquals(
         "Applying an unbounded SDF to an unbounded collection produces an unbounded collection",
         PCollection.IsBounded.UNBOUNDED,
         makeUnboundedCollection(pipeline)
-            .apply("unbounded to unbounded", new SplittableParDo<>(unboundedFn))
-            .isBounded());
+            .apply(
+                "unbounded to unbounded",
+                new SplittableParDo<>(makeParDo(unboundedFn)))
+            .get(MAIN_OUTPUT_TAG).isBounded());
   }
 
   // ------------------------------- Tests for ProcessFn ---------------------------------
 
+  enum WindowExplosion {
+    EXPLODE_WINDOWS,
+    DO_NOT_EXPLODE_WINDOWS
+  }
+
   /**
    * A helper for testing {@link SplittableParDo.ProcessFn} on 1 element (but possibly over multiple
    * {@link DoFn.ProcessElement} calls).
@@ -179,6 +202,46 @@ public class SplittableParDoTest {
           new SplittableParDo.ProcessFn<>(
               fn, inputCoder, restrictionCoder, IntervalWindow.getCoder());
       this.tester = DoFnTester.of(processFn);
+      processFn.setStateInternalsFactory(
+          new StateInternalsFactory<String>() {
+            @Override
+            public StateInternals<String> stateInternalsForKey(String key) {
+              return tester.getStateInternals();
+            }
+          });
+      processFn.setTimerInternalsFactory(
+          new TimerInternalsFactory<String>() {
+            @Override
+            public TimerInternals timerInternalsForKey(String key) {
+              return tester.getTimerInternals();
+            }
+          });
+      processFn.setOutputWindowedValue(
+          new OutputWindowedValue<OutputT>() {
+            @Override
+            public void outputWindowedValue(
+                OutputT output,
+                Instant timestamp,
+                Collection<? extends BoundedWindow> windows,
+                PaneInfo pane) {
+              tester
+                  .getMutableOutput(tester.getMainOutputTag())
+                  .add(WindowedValue.of(output, timestamp, windows, pane));
+            }
+
+            @Override
+            public <SideOutputT> void sideOutputWindowedValue(
+                TupleTag<SideOutputT> tag,
+                SideOutputT output,
+                Instant timestamp,
+                Collection<? extends BoundedWindow> windows,
+                PaneInfo pane) {
+              tester.getMutableOutput(tag).add(WindowedValue.of(output, timestamp, windows, pane));
+            }
+          });
+      // Do not clone since ProcessFn references non-serializable DoFnTester itself
+      // through the state/timer/output callbacks.
+      this.tester.setCloningBehavior(DoFnTester.CloningBehavior.DO_NOT_CLONE);
       this.tester.startBundle();
       this.tester.advanceProcessingTime(currentProcessingTime);
 
@@ -192,12 +255,24 @@ public class SplittableParDoTest {
               ElementAndRestriction.of(element, restriction),
               currentProcessingTime,
               GlobalWindow.INSTANCE,
-              PaneInfo.ON_TIME_AND_ONLY_FIRING));
+              PaneInfo.ON_TIME_AND_ONLY_FIRING),
+          WindowExplosion.DO_NOT_EXPLODE_WINDOWS);
     }
 
-    void startElement(WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue)
+    void startElement(
+        WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue,
+        WindowExplosion explosion)
         throws Exception {
-      tester.processElement(KeyedWorkItems.elementsWorkItem("key", Arrays.asList(windowedValue)));
+      switch (explosion) {
+        case EXPLODE_WINDOWS:
+          tester.processElement(
+              KeyedWorkItems.elementsWorkItem("key", windowedValue.explodeWindows()));
+          break;
+        case DO_NOT_EXPLODE_WINDOWS:
+          tester.processElement(
+              KeyedWorkItems.elementsWorkItem("key", Arrays.asList(windowedValue)));
+          break;
+      }
     }
 
     /**
@@ -253,9 +328,6 @@ public class SplittableParDoTest {
     DoFn<Integer, String> fn = new ToStringFn();
 
     Instant base = Instant.now();
-    ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester =
-        new ProcessFnTester<>(
-            base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeRestriction.class));
 
     IntervalWindow w1 =
         new IntervalWindow(
@@ -267,20 +339,26 @@ public class SplittableParDoTest {
         new IntervalWindow(
             base.minus(Duration.standardMinutes(3)), base.plus(Duration.standardMinutes(3)));
 
-    tester.startElement(
-        WindowedValue.of(
-            ElementAndRestriction.of(42, new SomeRestriction()),
-            base,
-            Arrays.asList(w1, w2, w3),
-            PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
-    for (IntervalWindow w : new IntervalWindow[] {w1, w2, w3}) {
-      assertEquals(
-          Arrays.asList(
-              TimestampedValue.of("42a", base),
-              TimestampedValue.of("42b", base),
-              TimestampedValue.of("42c", base)),
-          tester.peekOutputElementsInWindow(w));
+    for (WindowExplosion explosion : WindowExplosion.values()) {
+      ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester =
+          new ProcessFnTester<>(
+              base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeRestriction.class));
+      tester.startElement(
+          WindowedValue.of(
+              ElementAndRestriction.of(42, new SomeRestriction()),
+              base,
+              Arrays.asList(w1, w2, w3),
+              PaneInfo.ON_TIME_AND_ONLY_FIRING),
+          explosion);
+
+      for (IntervalWindow w : new IntervalWindow[] {w1, w2, w3}) {
+        assertEquals(
+            Arrays.asList(
+                TimestampedValue.of("42a", base),
+                TimestampedValue.of("42b", base),
+                TimestampedValue.of("42c", base)),
+            tester.peekOutputElementsInWindow(w));
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
index 680a971..04becd7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
@@ -17,48 +17,23 @@
  */
 package org.apache.beam.runners.direct;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.runners.core.GBKIntoKeyedWorkItems;
-import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.runners.core.SplittableParDo;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItemCoder;
-import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
-/** Provides an implementation of {@link GBKIntoKeyedWorkItems} for the Direct Runner. */
+/**
+ * Provides an implementation of {@link SplittableParDo.GBKIntoKeyedWorkItems} for the Direct
+ * Runner.
+ */
 class DirectGBKIntoKeyedWorkItemsOverrideFactory<KeyT, InputT>
     implements PTransformOverrideFactory<
         PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>,
-        GBKIntoKeyedWorkItems<KeyT, InputT>> {
+        SplittableParDo.GBKIntoKeyedWorkItems<KeyT, InputT>> {
   @Override
   public PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>>
-      override(GBKIntoKeyedWorkItems<KeyT, InputT> transform) {
-    return new DirectGBKIntoKeyedWorkItems<>(transform.getName());
-  }
-
-  /** The Direct Runner specific implementation of {@link GBKIntoKeyedWorkItems}. */
-  private static class DirectGBKIntoKeyedWorkItems<KeyT, InputT>
-      extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
-    DirectGBKIntoKeyedWorkItems(String name) {
-      super(name);
-    }
-
-    @Override
-    public PCollection<KeyedWorkItem<KeyT, InputT>> apply(PCollection<KV<KeyT, InputT>> input) {
-      checkArgument(input.getCoder() instanceof KvCoder);
-      KvCoder<KeyT, InputT> kvCoder = (KvCoder<KeyT, InputT>) input.getCoder();
-      return input
-          // TODO: Perhaps windowing strategy should instead be set by ReifyTAW, or by DGBKO
-          .setWindowingStrategyInternal(WindowingStrategy.globalDefault())
-          .apply(new DirectGroupByKey.DirectGroupByKeyOnly<KeyT, InputT>())
-          .setCoder(
-              KeyedWorkItemCoder.of(
-                  kvCoder.getKeyCoder(),
-                  kvCoder.getValueCoder(),
-                  input.getWindowingStrategy().getWindowFn().windowCoder()));
-    }
+      override(SplittableParDo.GBKIntoKeyedWorkItems<KeyT, InputT> transform) {
+    return new DirectGroupByKey.DirectGroupByKeyOnly<>();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 219314a..efee801 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -65,7 +65,7 @@ class DirectGroupByKey<K, V>
             KeyedWorkItemCoder.of(
                 inputCoder.getKeyCoder(),
                 inputCoder.getValueCoder(),
-                input.getWindowingStrategy().getWindowFn().windowCoder()))
+                inputWindowingStrategy.getWindowFn().windowCoder()))
 
         // Group each key's values by window, merging windows as needed.
         .apply(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index f71e109..82de9ab 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -30,7 +30,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.core.GBKIntoKeyedWorkItems;
+import org.apache.beam.runners.core.SplittableParDo;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
@@ -88,7 +88,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
               .put(ParDo.Bound.class, new ParDoSingleViaMultiOverrideFactory())
               .put(ParDo.BoundMulti.class, new ParDoMultiOverrideFactory())
               .put(
-                  GBKIntoKeyedWorkItems.class,
+                  SplittableParDo.GBKIntoKeyedWorkItems.class,
                   new DirectGBKIntoKeyedWorkItemsOverrideFactory())
               .build();
 
@@ -307,8 +307,8 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
     @SuppressWarnings("rawtypes")
     KeyedPValueTrackingVisitor keyedPValueVisitor =
         KeyedPValueTrackingVisitor.create(
-            ImmutableSet.<Class<? extends PTransform>>of(
-                GBKIntoKeyedWorkItems.class,
+            ImmutableSet.of(
+                SplittableParDo.GBKIntoKeyedWorkItems.class,
                 DirectGroupByKeyOnly.class,
                 DirectGroupAlsoByWindow.class));
     pipeline.traverseTopologically(keyedPValueVisitor);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
index 67d957c..cd644a6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
@@ -56,9 +56,9 @@ class DoFnLifecycleManager {
     thrownOnTeardown = new ConcurrentHashMap<>();
   }
 
-  public DoFn<?, ?> get() throws Exception {
+  public <InputT, OutputT> DoFn<InputT, OutputT> get() throws Exception {
     Thread currentThread = Thread.currentThread();
-    return outstanding.get(currentThread);
+    return (DoFn<InputT, OutputT>) outstanding.get(currentThread);
   }
 
   public void remove() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 750e5f1..504ddc4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.direct;
 import com.google.common.collect.ImmutableList;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -58,9 +57,9 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
     Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>();
     for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
       outputBundles.put(
-          outputEntry.getKey(),
-          evaluationContext.createBundle(outputEntry.getValue()));
+          outputEntry.getKey(), evaluationContext.createBundle(outputEntry.getValue()));
     }
+    BundleOutputManager outputManager = BundleOutputManager.create(outputBundles);
 
     ReadyCheckingSideInputReader sideInputReader =
         evaluationContext.createSideInputReader(sideInputs);
@@ -69,7 +68,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
             evaluationContext.getPipelineOptions(),
             fn,
             sideInputReader,
-            BundleOutputManager.create(outputBundles),
+            outputManager,
             mainOutputTag,
             sideOutputTags,
             stepContext,
@@ -85,12 +84,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
     }
 
     return new ParDoEvaluator<>(
-        evaluationContext,
-        runner,
-        application,
-        aggregatorChanges,
-        outputBundles.values(),
-        stepContext);
+        evaluationContext, runner, application, aggregatorChanges, outputManager, stepContext);
   }
 
   ////////////////////////////////////////////////////////////////////////////////////////////////
@@ -99,7 +93,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
   private final PushbackSideInputDoFnRunner<InputT, ?> fnRunner;
   private final AppliedPTransform<?, ?, ?> transform;
   private final AggregatorContainer.Mutator aggregatorChanges;
-  private final Collection<UncommittedBundle<?>> outputBundles;
+  private final BundleOutputManager outputManager;
   private final DirectStepContext stepContext;
 
   private final ImmutableList.Builder<WindowedValue<InputT>> unprocessedElements;
@@ -109,17 +103,21 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
       PushbackSideInputDoFnRunner<InputT, ?> fnRunner,
       AppliedPTransform<?, ?, ?> transform,
       AggregatorContainer.Mutator aggregatorChanges,
-      Collection<UncommittedBundle<?>> outputBundles,
+      BundleOutputManager outputManager,
       DirectStepContext stepContext) {
     this.evaluationContext = evaluationContext;
     this.fnRunner = fnRunner;
     this.transform = transform;
-    this.outputBundles = outputBundles;
+    this.outputManager = outputManager;
     this.stepContext = stepContext;
     this.aggregatorChanges = aggregatorChanges;
     this.unprocessedElements = ImmutableList.builder();
   }
 
+  public BundleOutputManager getOutputManager() {
+    return outputManager;
+  }
+
   @Override
   public void processElement(WindowedValue<InputT> element) {
     try {
@@ -147,7 +145,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
       resultBuilder = StepTransformResult.withoutHold(transform);
     }
     return resultBuilder
-        .addOutput(outputBundles)
+        .addOutput(outputManager.bundles.values())
         .withTimerUpdate(stepContext.getTimerUpdate())
         .withAggregatorChanges(aggregatorChanges)
         .addUnprocessedElements(unprocessedElements.build())

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 02e034a..ec5dc2c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -57,6 +57,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
   public <T> TransformEvaluator<T> forApplication(
       AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws Exception {
 
+    @SuppressWarnings("unchecked")
     AppliedPTransform<PCollection<InputT>, PCollectionTuple, ParDo.BoundMulti<InputT, OutputT>>
         parDoApplication =
             (AppliedPTransform<
@@ -93,13 +94,12 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
    */
   @SuppressWarnings({"unchecked", "rawtypes"})
   TransformEvaluator<InputT> createEvaluator(
-        AppliedPTransform<PCollection<?>, PCollectionTuple, ?>
-        application,
-        StructuralKey<?> inputBundleKey,
-        DoFn<InputT, OutputT> doFn,
-        List<PCollectionView<?>> sideInputs,
-        TupleTag<OutputT> mainOutputTag,
-        List<TupleTag<?>> sideOutputTags)
+      AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application,
+      StructuralKey<?> inputBundleKey,
+      DoFn<InputT, OutputT> doFn,
+      List<PCollectionView<?>> sideInputs,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags)
       throws Exception {
     String stepName = evaluationContext.getStepName(application);
     DirectStepContext stepContext =
@@ -107,21 +107,40 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
             .getExecutionContext(application, inputBundleKey)
             .getOrCreateStepContext(stepName, stepName);
 
-    DoFnLifecycleManager fnManager = fnClones.getUnchecked(doFn);
+    DoFnLifecycleManager fnManager = getManagerForCloneOf(doFn);
 
+    return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(
+        createParDoEvaluator(
+            application,
+            sideInputs,
+            mainOutputTag,
+            sideOutputTags,
+            stepContext,
+            fnManager.<InputT, OutputT>get(),
+            fnManager),
+        fnManager);
+  }
+
+  ParDoEvaluator<InputT, OutputT> createParDoEvaluator(
+      AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application,
+      List<PCollectionView<?>> sideInputs,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      DirectStepContext stepContext,
+      DoFn<InputT, OutputT> fn,
+      DoFnLifecycleManager fnManager)
+      throws Exception {
     try {
-      return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(
-          ParDoEvaluator.<InputT, OutputT>create(
-              evaluationContext,
-              stepContext,
-              application,
-              application.getInput().getWindowingStrategy(),
-              fnManager.get(),
-              sideInputs,
-              mainOutputTag,
-              sideOutputTags,
-              application.getOutput().getAll()),
-          fnManager);
+      return ParDoEvaluator.create(
+          evaluationContext,
+          stepContext,
+          application,
+          application.getInput().getWindowingStrategy(),
+          fn,
+          sideInputs,
+          mainOutputTag,
+          sideOutputTags,
+          application.getOutput().getAll());
     } catch (Exception e) {
       try {
         fnManager.remove();
@@ -134,4 +153,8 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
       throw e;
     }
   }
+
+  public DoFnLifecycleManager getManagerForCloneOf(DoFn<?, ?> fn) {
+    return fnClones.getUnchecked(fn);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 8db5159..9c9256d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -49,7 +49,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
     DoFn<InputT, OutputT> fn = transform.getNewFn();
     DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
     if (signature.processElement().isSplittable()) {
-      return new SplittableParDo(fn);
+      return new SplittableParDo(transform);
     } else if (signature.stateDeclarations().size() > 0
         || signature.timerDeclarations().size() > 0) {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
new file mode 100644
index 0000000..0eca710
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import java.util.Collection;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.ElementAndRestriction;
+import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.util.state.TimerInternalsFactory;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+class SplittableProcessElementsEvaluatorFactory<InputT, OutputT, RestrictionT>
+    implements TransformEvaluatorFactory {
+  private final ParDoEvaluatorFactory<
+          KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+      delegateFactory;
+  private final EvaluationContext evaluationContext;
+
+  SplittableProcessElementsEvaluatorFactory(EvaluationContext evaluationContext) {
+    this.evaluationContext = evaluationContext;
+    this.delegateFactory = new ParDoEvaluatorFactory<>(evaluationContext);
+  }
+
+  @Override
+  public <T> TransformEvaluator<T> forApplication(
+      AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws Exception {
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    TransformEvaluator<T> evaluator =
+        (TransformEvaluator<T>)
+            createEvaluator((AppliedPTransform) application, (CommittedBundle) inputBundle);
+    return evaluator;
+  }
+
+  @Override
+  public void cleanup() throws Exception {
+    delegateFactory.cleanup();
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  private TransformEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
+      createEvaluator(
+          AppliedPTransform<
+                  PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>,
+                  PCollectionTuple, SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT>>
+              application,
+          CommittedBundle<InputT> inputBundle)
+          throws Exception {
+    final SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT> transform =
+        application.getTransform();
+
+    DoFnLifecycleManager fnManager = delegateFactory.getManagerForCloneOf(transform.getFn());
+
+    SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, ?> processFn =
+        transform.newProcessFn(fnManager.<InputT, OutputT>get());
+
+    String stepName = evaluationContext.getStepName(application);
+    final DirectExecutionContext.DirectStepContext stepContext =
+        evaluationContext
+            .getExecutionContext(application, inputBundle.getKey())
+            .getOrCreateStepContext(stepName, stepName);
+
+    ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+        parDoEvaluator =
+            delegateFactory.createParDoEvaluator(
+                application,
+                transform.getSideInputs(),
+                transform.getMainOutputTag(),
+                transform.getSideOutputTags().getAll(),
+                stepContext,
+                processFn,
+                fnManager);
+
+    processFn.setStateInternalsFactory(
+        new StateInternalsFactory<String>() {
+          @SuppressWarnings({"unchecked", "rawtypes"})
+          @Override
+          public StateInternals<String> stateInternalsForKey(String key) {
+            return (StateInternals) stepContext.stateInternals();
+          }
+        });
+
+    processFn.setTimerInternalsFactory(
+        new TimerInternalsFactory<String>() {
+          @Override
+          public TimerInternals timerInternalsForKey(String key) {
+            return stepContext.timerInternals();
+          }
+        });
+
+    final OutputManager outputManager = parDoEvaluator.getOutputManager();
+    processFn.setOutputWindowedValue(
+        new OutputWindowedValue<OutputT>() {
+          @Override
+          public void outputWindowedValue(
+              OutputT output,
+              Instant timestamp,
+              Collection<? extends BoundedWindow> windows,
+              PaneInfo pane) {
+            outputManager.output(
+                transform.getMainOutputTag(), WindowedValue.of(output, timestamp, windows, pane));
+          }
+
+          @Override
+          public <SideOutputT> void sideOutputWindowedValue(
+              TupleTag<SideOutputT> tag,
+              SideOutputT output,
+              Instant timestamp,
+              Collection<? extends BoundedWindow> windows,
+              PaneInfo pane) {
+            outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane));
+          }
+        });
+
+    return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(parDoEvaluator, fnManager);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index a4c462a..1ddf9f4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.runners.core.SplittableParDo;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
@@ -61,6 +62,10 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
             .put(
                 TestStreamEvaluatorFactory.DirectTestStreamFactory.DirectTestStream.class,
                 new TestStreamEvaluatorFactory(ctxt))
+            // Runner-specific primitive used in expansion of SplittableParDo
+            .put(
+                SplittableParDo.ProcessElements.class,
+                new SplittableProcessElementsEvaluatorFactory<>(ctxt))
             .build();
     return new TransformEvaluatorRegistry(primitives);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
index c164ce6..f9e833f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -32,20 +33,28 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.Keys;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.joda.time.MutableDateTime;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -66,6 +75,11 @@ public class SplittableDoFnTest {
       this.from = from;
       this.to = to;
     }
+
+    @Override
+    public String toString() {
+      return "OffsetRange{" + "from=" + from + ", to=" + to + '}';
+    }
   }
 
   private static class OffsetRangeTracker implements RestrictionTracker<OffsetRange> {
@@ -140,11 +154,8 @@ public class SplittableDoFnTest {
     }
   }
 
-  @Ignore(
-      "BEAM-801: SplittableParDo uses unsupported OldDoFn features that are not available in DoFn; "
-          + "It must be implemented as a primitive.")
   @Test
-  public void testPairWithIndexBasic() throws ClassNotFoundException {
+  public void testPairWithIndexBasic() {
     Pipeline p = TestPipeline.create();
     p.getOptions().setRunner(DirectRunner.class);
     PCollection<KV<String, Integer>> res =
@@ -167,11 +178,8 @@ public class SplittableDoFnTest {
     p.run();
   }
 
-  @Ignore(
-      "BEAM-801: SplittableParDo uses unsupported OldDoFn features that are not available in DoFn; "
-          + "It must be implemented as a primitive.")
   @Test
-  public void testPairWithIndexWindowedTimestamped() throws ClassNotFoundException {
+  public void testPairWithIndexWindowedTimestamped() {
     // Tests that Splittable DoFn correctly propagates windowing strategy, windows and timestamps
     // of elements in the input collection.
     Pipeline p = TestPipeline.create();
@@ -228,4 +236,172 @@ public class SplittableDoFnTest {
     }
     p.run();
   }
+
+  private static class SDFWithSideInputsAndOutputs extends DoFn<Integer, String> {
+    private final PCollectionView<String> sideInput;
+    private final TupleTag<String> sideOutput;
+
+    private SDFWithSideInputsAndOutputs(
+        PCollectionView<String> sideInput, TupleTag<String> sideOutput) {
+      this.sideInput = sideInput;
+      this.sideOutput = sideOutput;
+    }
+
+    @ProcessElement
+    public void process(ProcessContext c, OffsetRangeTracker tracker) {
+      checkState(tracker.tryClaim(tracker.currentRestriction().from));
+      String side = c.sideInput(sideInput);
+      c.output("main:" + side + ":" + c.element());
+      c.sideOutput(sideOutput, "side:" + side + ":" + c.element());
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(Integer value) {
+      return new OffsetRange(0, 1);
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(OffsetRange range) {
+      return new OffsetRangeTracker(range);
+    }
+  }
+
+  @Test
+  public void testSideInputsAndOutputs() throws Exception {
+    Pipeline p = TestPipeline.create();
+    p.getOptions().setRunner(DirectRunner.class);
+
+    PCollectionView<String> sideInput =
+        p.apply("side input", Create.of("foo")).apply(View.<String>asSingleton());
+    TupleTag<String> mainOutputTag = new TupleTag<>("main");
+    TupleTag<String> sideOutputTag = new TupleTag<>("side");
+
+    PCollectionTuple res =
+        p.apply("input", Create.of(0, 1, 2))
+            .apply(
+                ParDo.of(new SDFWithSideInputsAndOutputs(sideInput, sideOutputTag))
+                    .withSideInputs(sideInput)
+                    .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
+    res.get(mainOutputTag).setCoder(StringUtf8Coder.of());
+    res.get(sideOutputTag).setCoder(StringUtf8Coder.of());
+
+    PAssert.that(res.get(mainOutputTag))
+        .containsInAnyOrder(Arrays.asList("main:foo:0", "main:foo:1", "main:foo:2"));
+    PAssert.that(res.get(sideOutputTag))
+        .containsInAnyOrder(Arrays.asList("side:foo:0", "side:foo:1", "side:foo:2"));
+
+    p.run();
+  }
+
+  @Test
+  public void testLateData() throws Exception {
+    Pipeline p = TestPipeline.create();
+    p.getOptions().setRunner(DirectRunner.class);
+
+    Instant base = Instant.now();
+
+    TestStream<String> stream =
+        TestStream.create(StringUtf8Coder.of())
+            .advanceWatermarkTo(base)
+            .addElements("aa")
+            .advanceWatermarkTo(base.plus(Duration.standardSeconds(5)))
+            .addElements(TimestampedValue.of("bb", base.minus(Duration.standardHours(1))))
+            .advanceProcessingTime(Duration.standardHours(1))
+            .advanceWatermarkToInfinity();
+
+    PCollection<String> input =
+        p.apply(stream)
+            .apply(
+                Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
+                    .withAllowedLateness(Duration.standardMinutes(1)));
+
+    PCollection<KV<String, Integer>> afterSDF =
+        input
+            .apply(ParDo.of(new PairStringWithIndexToLength()))
+            .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
+
+    PCollection<String> nonLate =
+        afterSDF.apply(GroupByKey.<String, Integer>create()).apply(Keys.<String>create());
+
+    // The splittable DoFn itself should not drop any data and act as pass-through.
+    PAssert.that(afterSDF)
+        .containsInAnyOrder(
+            Arrays.asList(KV.of("aa", 0), KV.of("aa", 1), KV.of("bb", 0), KV.of("bb", 1)));
+
+    // But it should preserve the windowing strategy of the data, including allowed lateness:
+    // the follow-up GBK should drop the late data.
+    assertEquals(afterSDF.getWindowingStrategy(), input.getWindowingStrategy());
+    PAssert.that(nonLate).containsInAnyOrder("aa");
+
+    p.run();
+  }
+
+  private static class SDFWithLifecycle extends DoFn<String, String> {
+    private enum State {
+      BEFORE_SETUP,
+      OUTSIDE_BUNDLE,
+      INSIDE_BUNDLE,
+      TORN_DOWN
+    }
+
+    private State state = State.BEFORE_SETUP;
+
+    @ProcessElement
+    public void processElement(ProcessContext c, OffsetRangeTracker tracker) {
+      assertEquals(State.INSIDE_BUNDLE, state);
+      assertTrue(tracker.tryClaim(0));
+      c.output(c.element());
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(String value) {
+      return new OffsetRange(0, 1);
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(OffsetRange range) {
+      return new OffsetRangeTracker(range);
+    }
+
+    @Setup
+    public void setUp() {
+      assertEquals(State.BEFORE_SETUP, state);
+      state = State.OUTSIDE_BUNDLE;
+    }
+
+    @StartBundle
+    public void startBundle(Context c) {
+      assertEquals(State.OUTSIDE_BUNDLE, state);
+      state = State.INSIDE_BUNDLE;
+    }
+
+    @FinishBundle
+    public void finishBundle(Context c) {
+      assertEquals(State.INSIDE_BUNDLE, state);
+      state = State.OUTSIDE_BUNDLE;
+    }
+
+    @Teardown
+    public void tearDown() {
+      assertEquals(State.OUTSIDE_BUNDLE, state);
+      state = State.TORN_DOWN;
+    }
+  }
+
+  @Test
+  public void testLifecycleMethods() throws Exception {
+    Pipeline p = TestPipeline.create();
+    p.getOptions().setRunner(DirectRunner.class);
+
+    PCollection<String> res =
+        p.apply(Create.of("a", "b", "c")).apply(ParDo.of(new SDFWithLifecycle()));
+
+    PAssert.that(res).containsInAnyOrder("a", "b", "c");
+
+    p.run();
+  }
+
+  // TODO (https://issues.apache.org/jira/browse/BEAM-988): Test that Splittable DoFn
+  // emits output immediately (i.e. has a pass-through trigger) regardless of input's
+  // windowing/triggering strategy.
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 3f1a3f9..7aabec9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -120,6 +120,9 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
      * should be in, throwing an exception if the {@code WindowFn} attempts
      * to access any information about the input element. The output element
      * will have a timestamp of negative infinity.
+     *
+     * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from
+     * {@link StartBundle} or {@link FinishBundle} methods.
      */
     public abstract void output(OutputT output);
 
@@ -142,6 +145,9 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
      * should be in, throwing an exception if the {@code WindowFn} attempts
      * to access any information about the input element except for the
      * timestamp.
+     *
+     * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from
+     * {@link StartBundle} or {@link FinishBundle} methods.
      */
     public abstract void outputWithTimestamp(OutputT output, Instant timestamp);
 
@@ -168,6 +174,9 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
      * to access any information about the input element. The output element
      * will have a timestamp of negative infinity.
      *
+     * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from
+     * {@link StartBundle} or {@link FinishBundle} methods.
+     *
      * @see ParDo#withOutputTags
      */
     public abstract <T> void sideOutput(TupleTag<T> tag, T output);
@@ -192,6 +201,9 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
      * to access any information about the input element except for the
      * timestamp.
      *
+     * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from
+     * {@link StartBundle} or {@link FinishBundle} methods.
+     *
      * @see ParDo#withOutputTags
      */
     public abstract <T> void sideOutputWithTimestamp(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index daa8a06..0c6043f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -140,6 +140,15 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     windowValues.put(window, value);
   }
 
+  @SuppressWarnings("unchecked")
+  public <K> StateInternals<K> getStateInternals() {
+    return (StateInternals<K>) stateInternals;
+  }
+
+  public TimerInternals getTimerInternals() {
+    return timerInternals;
+  }
+
   /**
    * When a {@link DoFnTester} should clone the {@link DoFn} under test and how it should manage
    * the lifecycle of the {@link DoFn}.
@@ -321,7 +330,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
    *
    */
   public List<OutputT> peekOutputElements() {
-    // TODO: Should we return an unmodifiable list?
     return Lists.transform(
         peekOutputElementsWithTimestamp(),
         new Function<TimestampedValue<OutputT>, OutputT>() {
@@ -344,7 +352,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
   @Experimental
   public List<TimestampedValue<OutputT>> peekOutputElementsWithTimestamp() {
     // TODO: Should we return an unmodifiable list?
-    return Lists.transform(getOutput(mainOutputTag),
+    return Lists.transform(getImmutableOutput(mainOutputTag),
         new Function<WindowedValue<OutputT>, TimestampedValue<OutputT>>() {
           @Override
           @SuppressWarnings("unchecked")
@@ -370,7 +378,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
       TupleTag<OutputT> tag,
       BoundedWindow window) {
     ImmutableList.Builder<TimestampedValue<OutputT>> valuesBuilder = ImmutableList.builder();
-    for (WindowedValue<OutputT> value : getOutput(tag)) {
+    for (WindowedValue<OutputT> value : getImmutableOutput(tag)) {
       if (value.getWindows().contains(window)) {
         valuesBuilder.add(TimestampedValue.of(value.getValue(), value.getTimestamp()));
       }
@@ -384,7 +392,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
    * @see #peekOutputElements
    */
   public void clearOutputElements() {
-    peekOutputElements().clear();
+    getMutableOutput(mainOutputTag).clear();
   }
 
   /**
@@ -425,7 +433,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
    */
   public <T> List<T> peekSideOutputElements(TupleTag<T> tag) {
     // TODO: Should we return an unmodifiable list?
-    return Lists.transform(getOutput(tag),
+    return Lists.transform(getImmutableOutput(tag),
         new Function<WindowedValue<T>, T>() {
           @SuppressWarnings("unchecked")
           @Override
@@ -441,7 +449,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
    * @see #peekSideOutputElements
    */
   public <T> void clearSideOutputElements(TupleTag<T> tag) {
-    peekSideOutputElements(tag).clear();
+    getMutableOutput(tag).clear();
   }
 
   /**
@@ -502,10 +510,25 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     return combiner.extractOutput(accumulator);
   }
 
-  private <T> List<WindowedValue<T>> getOutput(TupleTag<T> tag) {
+  private <T> List<WindowedValue<T>> getImmutableOutput(TupleTag<T> tag) {
     @SuppressWarnings({"unchecked", "rawtypes"})
     List<WindowedValue<T>> elems = (List) outputs.get(tag);
-    return MoreObjects.firstNonNull(elems, Collections.<WindowedValue<T>>emptyList());
+    return ImmutableList.copyOf(
+        MoreObjects.firstNonNull(elems, Collections.<WindowedValue<T>>emptyList()));
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  public <T> List<WindowedValue<T>> getMutableOutput(TupleTag<T> tag) {
+    List<WindowedValue<T>> outputList = (List) outputs.get(tag);
+    if (outputList == null) {
+      outputList = new ArrayList<>();
+      outputs.put(tag, (List) outputList);
+    }
+    return outputList;
+  }
+
+  public TupleTag<OutputT> getMainOutputTag() {
+    return mainOutputTag;
   }
 
   private TestContext createContext(OldDoFn<InputT, OutputT> fn) {
@@ -590,17 +613,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     }
 
     public <T> void noteOutput(TupleTag<T> tag, WindowedValue<T> output) {
-      getOutputList(tag).add(output);
-    }
-
-    private <T> List<WindowedValue<T>> getOutputList(TupleTag<T> tag) {
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      List<WindowedValue<T>> outputList = (List) outputs.get(tag);
-      if (outputList == null) {
-        outputList = new ArrayList<>();
-        outputs.put(tag, (List) outputList);
-      }
-      return outputList;
+      getMutableOutput(tag).add(output);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerInternalsFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerInternalsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerInternalsFactory.java
new file mode 100644
index 0000000..b9c3d5e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerInternalsFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.util.TimerInternals;
+
+/**
+ * A factory for providing {@link TimerInternals} for a particular key.
+ *
+ * <p>Because it will generally be embedded in a {@link org.apache.beam.sdk.transforms.DoFn DoFn},
+ * albeit at execution time, it is marked {@link Serializable}.
+ */
+@Experimental(Kind.STATE)
+public interface TimerInternalsFactory<K> {
+
+  /** Returns {@link TimerInternals} for the provided key. */
+  TimerInternals timerInternalsForKey(K key);
+}