You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/10/13 00:38:28 UTC

[1/4] incubator-beam git commit: [BEAM-65] SplittableDoFn prototype.

Repository: incubator-beam
Updated Branches:
  refs/heads/master a5d129361 -> 13b45895e


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index d057765..0bfe2be 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -17,14 +17,32 @@
  */
 package org.apache.beam.sdk.transforms.reflect;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-
+import static org.mockito.Mockito.when;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.GetInitialRestriction;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowingInternals;
@@ -34,7 +52,9 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.mockito.AdditionalAnswers;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
 /** Tests for {@link DoFnInvokers}. */
@@ -76,11 +96,16 @@ public class DoFnInvokersTest {
           public WindowingInternals<String, String> windowingInternals() {
             return mockWindowingInternals;
           }
+
+          @Override
+          public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
+            return null;
+          }
         };
   }
 
-  private void invokeProcessElement(DoFn<String, String> fn) {
-    DoFnInvokers.INSTANCE
+  private ProcessContinuation invokeProcessElement(DoFn<String, String> fn) {
+    return DoFnInvokers.INSTANCE
         .newByteBuddyInvoker(fn)
         .invokeProcessElement(mockContext, extraContextFactory);
   }
@@ -106,9 +131,9 @@ public class DoFnInvokersTest {
       @ProcessElement
       public void processElement(ProcessContext c) throws Exception {}
     }
-    MockFn fn = mock(MockFn.class);
-    invokeProcessElement(fn);
-    verify(fn).processElement(mockContext);
+    MockFn mockFn = mock(MockFn.class);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(mockFn));
+    verify(mockFn).processElement(mockContext);
   }
 
   interface InterfaceWithProcessElement {
@@ -128,7 +153,7 @@ public class DoFnInvokersTest {
   public void testDoFnWithProcessElementInterface() throws Exception {
     IdentityUsingInterfaceWithProcessElement fn =
         mock(IdentityUsingInterfaceWithProcessElement.class);
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     verify(fn).processElement(mockContext);
   }
 
@@ -149,14 +174,14 @@ public class DoFnInvokersTest {
   @Test
   public void testDoFnWithMethodInSuperclass() throws Exception {
     IdentityChildWithoutOverride fn = mock(IdentityChildWithoutOverride.class);
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     verify(fn).process(mockContext);
   }
 
   @Test
   public void testDoFnWithMethodInSubclass() throws Exception {
     IdentityChildWithOverride fn = mock(IdentityChildWithOverride.class);
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     verify(fn).process(mockContext);
   }
 
@@ -167,7 +192,7 @@ public class DoFnInvokersTest {
       public void processElement(ProcessContext c, BoundedWindow w) throws Exception {}
     }
     MockFn fn = mock(MockFn.class);
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     verify(fn).processElement(mockContext, mockWindow);
   }
 
@@ -178,7 +203,7 @@ public class DoFnInvokersTest {
       public void processElement(ProcessContext c, OutputReceiver<String> o) throws Exception {}
     }
     MockFn fn = mock(MockFn.class);
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     verify(fn).processElement(mockContext, mockOutputReceiver);
   }
 
@@ -189,11 +214,35 @@ public class DoFnInvokersTest {
       public void processElement(ProcessContext c, InputProvider<String> o) throws Exception {}
     }
     MockFn fn = mock(MockFn.class);
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     verify(fn).processElement(mockContext, mockInputProvider);
   }
 
   @Test
+  public void testDoFnWithReturn() throws Exception {
+    class MockFn extends DoFn<String, String> {
+      @DoFn.ProcessElement
+      public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker)
+          throws Exception {
+        return null;
+      }
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(String element) {
+        return null;
+      }
+
+      @NewTracker
+      public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+        return null;
+      }
+    }
+    MockFn fn = mock(MockFn.class);
+    when(fn.processElement(mockContext, null)).thenReturn(ProcessContinuation.resume());
+    assertEquals(ProcessContinuation.resume(), invokeProcessElement(fn));
+  }
+
+  @Test
   public void testDoFnWithStartBundleSetupTeardown() throws Exception {
     class MockFn extends DoFn<String, String> {
       @ProcessElement
@@ -224,6 +273,154 @@ public class DoFnInvokersTest {
   }
 
   // ---------------------------------------------------------------------------------------
+  // Tests for invoking Splittable DoFn methods
+  // ---------------------------------------------------------------------------------------
+  private static class SomeRestriction {}
+
+  private abstract static class SomeRestrictionTracker
+      implements RestrictionTracker<SomeRestriction> {}
+
+  private static class SomeRestrictionCoder extends CustomCoder<SomeRestriction> {
+    public static SomeRestrictionCoder of() {
+      return new SomeRestrictionCoder();
+    }
+
+    @Override
+    public void encode(SomeRestriction value, OutputStream outStream, Context context) {}
+
+    @Override
+    public SomeRestriction decode(InputStream inStream, Context context) {
+      return null;
+    }
+  }
+
+  /** Public so Mockito can do "delegatesTo()" in the test below. */
+  public static class MockFn extends DoFn<String, String> {
+    @ProcessElement
+    public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker) {
+      return null;
+    }
+
+    @GetInitialRestriction
+    public SomeRestriction getInitialRestriction(String element) {
+      return null;
+    }
+
+    @SplitRestriction
+    public void splitRestriction(
+        String element, SomeRestriction restriction, OutputReceiver<SomeRestriction> receiver) {}
+
+    @NewTracker
+    public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+      return null;
+    }
+
+    @GetRestrictionCoder
+    public SomeRestrictionCoder getRestrictionCoder() {
+      return null;
+    }
+  }
+
+  @Test
+  public void testSplittableDoFnWithAllMethods() throws Exception {
+    MockFn fn = mock(MockFn.class);
+    DoFnInvoker<String, String> invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+    final SomeRestrictionTracker tracker = mock(SomeRestrictionTracker.class);
+    final SomeRestrictionCoder coder = mock(SomeRestrictionCoder.class);
+    SomeRestriction restriction = new SomeRestriction();
+    final SomeRestriction part1 = new SomeRestriction();
+    final SomeRestriction part2 = new SomeRestriction();
+    final SomeRestriction part3 = new SomeRestriction();
+    when(fn.getRestrictionCoder()).thenReturn(coder);
+    when(fn.getInitialRestriction("blah")).thenReturn(restriction);
+    doAnswer(
+            AdditionalAnswers.delegatesTo(
+                new MockFn() {
+                  @DoFn.SplitRestriction
+                  @Override
+                  public void splitRestriction(
+                      String element,
+                      SomeRestriction restriction,
+                      DoFn.OutputReceiver<SomeRestriction> receiver) {
+                    receiver.output(part1);
+                    receiver.output(part2);
+                    receiver.output(part3);
+                  }
+                }))
+        .when(fn)
+        .splitRestriction(
+            eq("blah"), same(restriction), Mockito.<DoFn.OutputReceiver<SomeRestriction>>any());
+    when(fn.newTracker(restriction)).thenReturn(tracker);
+    when(fn.processElement(mockContext, tracker)).thenReturn(ProcessContinuation.resume());
+
+    assertEquals(coder, invoker.invokeGetRestrictionCoder(new CoderRegistry()));
+    assertEquals(restriction, invoker.invokeGetInitialRestriction("blah"));
+    final List<SomeRestriction> outputs = new ArrayList<>();
+    invoker.invokeSplitRestriction(
+        "blah",
+        restriction,
+        new DoFn.OutputReceiver<SomeRestriction>() {
+          @Override
+          public void output(SomeRestriction output) {
+            outputs.add(output);
+          }
+        });
+    assertEquals(Arrays.asList(part1, part2, part3), outputs);
+    assertEquals(tracker, invoker.invokeNewTracker(restriction));
+    assertEquals(
+        ProcessContinuation.resume(),
+        invoker.invokeProcessElement(
+            mockContext,
+            new DoFn.FakeExtraContextFactory<String, String>() {
+              @Override
+              public RestrictionTracker restrictionTracker() {
+                return tracker;
+              }
+            }));
+  }
+
+  @Test
+  public void testSplittableDoFnDefaultMethods() throws Exception {
+    class MockFn extends DoFn<String, String> {
+      @ProcessElement
+      public void processElement(ProcessContext c, SomeRestrictionTracker tracker) {}
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(String element) {
+        return null;
+      }
+
+      @NewTracker
+      public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+        return null;
+      }
+    }
+    MockFn fn = mock(MockFn.class);
+    DoFnInvoker<String, String> invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+
+    CoderRegistry coderRegistry = new CoderRegistry();
+    coderRegistry.registerCoder(SomeRestriction.class, SomeRestrictionCoder.class);
+    assertThat(
+        invoker.<SomeRestriction>invokeGetRestrictionCoder(coderRegistry),
+        instanceOf(SomeRestrictionCoder.class));
+    invoker.invokeSplitRestriction(
+        "blah",
+        "foo",
+        new DoFn.OutputReceiver<String>() {
+          private boolean invoked;
+
+          @Override
+          public void output(String output) {
+            assertFalse(invoked);
+            invoked = true;
+            assertEquals("foo", output);
+          }
+        });
+    assertEquals(
+        ProcessContinuation.stop(), invoker.invokeProcessElement(mockContext, extraContextFactory));
+  }
+
+  // ---------------------------------------------------------------------------------------
   // Tests for ability to invoke private, inner and anonymous classes.
   // ---------------------------------------------------------------------------------------
 
@@ -235,14 +432,14 @@ public class DoFnInvokersTest {
   @Test
   public void testLocalPrivateDoFnClass() throws Exception {
     PrivateDoFnClass fn = mock(PrivateDoFnClass.class);
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     verify(fn).processThis(mockContext);
   }
 
   @Test
   public void testStaticPackagePrivateDoFnClass() throws Exception {
     DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPackagePrivateDoFn().getClass());
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     DoFnInvokersTestHelper.verifyStaticPackagePrivateDoFn(fn, mockContext);
   }
 
@@ -250,28 +447,28 @@ public class DoFnInvokersTest {
   public void testInnerPackagePrivateDoFnClass() throws Exception {
     DoFn<String, String> fn =
         mock(new DoFnInvokersTestHelper().newInnerPackagePrivateDoFn().getClass());
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     DoFnInvokersTestHelper.verifyInnerPackagePrivateDoFn(fn, mockContext);
   }
 
   @Test
   public void testStaticPrivateDoFnClass() throws Exception {
     DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPrivateDoFn().getClass());
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     DoFnInvokersTestHelper.verifyStaticPrivateDoFn(fn, mockContext);
   }
 
   @Test
   public void testInnerPrivateDoFnClass() throws Exception {
     DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerPrivateDoFn().getClass());
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     DoFnInvokersTestHelper.verifyInnerPrivateDoFn(fn, mockContext);
   }
 
   @Test
   public void testAnonymousInnerDoFn() throws Exception {
     DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerAnonymousDoFn().getClass());
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     DoFnInvokersTestHelper.verifyInnerAnonymousDoFn(fn, mockContext);
   }
 
@@ -279,7 +476,7 @@ public class DoFnInvokersTest {
   public void testStaticAnonymousDoFnInOtherPackage() throws Exception {
     // Can't use mockito for this one - the anonymous class is final and can't be mocked.
     DoFn<String, String> fn = DoFnInvokersTestHelper.newStaticAnonymousDoFn();
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     DoFnInvokersTestHelper.verifyStaticAnonymousDoFnInvoked(fn, mockContext);
   }
 
@@ -303,6 +500,32 @@ public class DoFnInvokersTest {
   }
 
   @Test
+  public void testProcessElementExceptionWithReturn() throws Exception {
+    thrown.expect(UserCodeException.class);
+    thrown.expectMessage("bogus");
+    DoFnInvokers.INSTANCE
+        .newByteBuddyInvoker(
+            new DoFn<Integer, Integer>() {
+              @ProcessElement
+              public ProcessContinuation processElement(
+                  @SuppressWarnings("unused") ProcessContext c, SomeRestrictionTracker tracker) {
+                throw new IllegalArgumentException("bogus");
+              }
+
+              @GetInitialRestriction
+              public SomeRestriction getInitialRestriction(Integer element) {
+                return null;
+              }
+
+              @NewTracker
+              public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+                return null;
+              }
+            })
+        .invokeProcessElement(null, new DoFn.FakeExtraContextFactory<Integer, Integer>());
+  }
+
+  @Test
   public void testStartBundleException() throws Exception {
     DoFnInvoker<Integer, Integer> invoker =
         DoFnInvokers.INSTANCE.newByteBuddyInvoker(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
index c269dbd..9cb1d23 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
@@ -61,7 +61,7 @@ public class DoFnSignaturesProcessElementTest {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(
         "Integer is not a valid context parameter. "
-            + "Should be one of [BoundedWindow]");
+            + "Should be one of [BoundedWindow, RestrictionTracker<?>]");
 
     analyzeProcessElementMethod(
         new AnonymousMethod() {
@@ -72,7 +72,7 @@ public class DoFnSignaturesProcessElementTest {
   @Test
   public void testBadReturnType() throws Exception {
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Must return void");
+    thrown.expectMessage("Must return void or ProcessContinuation");
 
     analyzeProcessElementMethod(
         new AnonymousMethod() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
new file mode 100644
index 0000000..a9a7c81
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import static org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.analyzeProcessElementMethod;
+import static org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.errors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.reflect.TypeToken;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.AnonymousMethod;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link DoFnSignatures} focused on methods related to <a
+ * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
+ */
+@SuppressWarnings("unused")
+@RunWith(JUnit4.class)
+public class DoFnSignaturesSplittableDoFnTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private static class SomeRestriction {}
+
+  private abstract static class SomeRestrictionTracker
+      implements RestrictionTracker<SomeRestriction> {}
+
+  private abstract static class SomeRestrictionCoder implements Coder<SomeRestriction> {}
+
+  @Test
+  public void testReturnsProcessContinuation() throws Exception {
+    DoFnSignature.ProcessElementMethod signature =
+        analyzeProcessElementMethod(
+            new AnonymousMethod() {
+              private DoFn.ProcessContinuation method(
+                  DoFn<Integer, String>.ProcessContext context) {
+                return null;
+              }
+            });
+
+    assertTrue(signature.hasReturnValue());
+  }
+
+  @Test
+  public void testHasRestrictionTracker() throws Exception {
+    DoFnSignature.ProcessElementMethod signature =
+        analyzeProcessElementMethod(
+            new AnonymousMethod() {
+              private void method(
+                  DoFn<Integer, String>.ProcessContext context, SomeRestrictionTracker tracker) {}
+            });
+
+    assertTrue(signature.isSplittable());
+    assertTrue(signature.extraParameters().contains(DoFnSignature.Parameter.RESTRICTION_TRACKER));
+    assertEquals(SomeRestrictionTracker.class, signature.trackerT().getRawType());
+  }
+
+  @Test
+  public void testSplittableProcessElementMustNotHaveOtherParams() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("must not have any extra context arguments");
+    thrown.expectMessage("BOUNDED_WINDOW");
+
+    DoFnSignature.ProcessElementMethod signature =
+        analyzeProcessElementMethod(
+            new AnonymousMethod() {
+              private void method(
+                  DoFn<Integer, String>.ProcessContext context,
+                  SomeRestrictionTracker tracker,
+                  BoundedWindow window) {}
+            });
+  }
+
+  @Test
+  public void testInfersBoundednessFromAnnotation() throws Exception {
+    class BaseSplittableFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {}
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(Integer element) {
+        return null;
+      }
+
+      @NewTracker
+      public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+        return null;
+      }
+    }
+
+    @BoundedPerElement
+    class BoundedSplittableFn extends BaseSplittableFn {}
+
+    @UnboundedPerElement
+    class UnboundedSplittableFn extends BaseSplittableFn {}
+
+    assertEquals(
+        PCollection.IsBounded.BOUNDED,
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(BaseSplittableFn.class)
+            .isBoundedPerElement());
+    assertEquals(
+        PCollection.IsBounded.BOUNDED,
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(BoundedSplittableFn.class)
+            .isBoundedPerElement());
+    assertEquals(
+        PCollection.IsBounded.UNBOUNDED,
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(UnboundedSplittableFn.class)
+            .isBoundedPerElement());
+  }
+
+  @Test
+  public void testUnsplittableIsBounded() throws Exception {
+    class UnsplittableFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(ProcessContext context) {}
+    }
+    assertEquals(
+        PCollection.IsBounded.BOUNDED,
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(UnsplittableFn.class)
+            .isBoundedPerElement());
+  }
+
+  private static class BaseFnWithContinuation extends DoFn<Integer, String> {
+    @ProcessElement
+    public ProcessContinuation processElement(
+        ProcessContext context, SomeRestrictionTracker tracker) {
+      return null;
+    }
+
+    @GetInitialRestriction
+    public SomeRestriction getInitialRestriction(Integer element) {
+      return null;
+    }
+
+    @NewTracker
+    public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+      return null;
+    }
+  }
+
+  @Test
+  public void testSplittableIsBoundedByDefault() throws Exception {
+    assertEquals(
+        PCollection.IsBounded.UNBOUNDED,
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(BaseFnWithContinuation.class)
+            .isBoundedPerElement());
+  }
+
+  @Test
+  public void testSplittableRespectsBoundednessAnnotation() throws Exception {
+    @BoundedPerElement
+    class BoundedFnWithContinuation extends BaseFnWithContinuation {}
+
+    assertEquals(
+        PCollection.IsBounded.BOUNDED,
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(BoundedFnWithContinuation.class)
+            .isBoundedPerElement());
+
+    @UnboundedPerElement
+    class UnboundedFnWithContinuation extends BaseFnWithContinuation {}
+
+    assertEquals(
+        PCollection.IsBounded.UNBOUNDED,
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(UnboundedFnWithContinuation.class)
+            .isBoundedPerElement());
+  }
+
+  @Test
+  public void testUnsplittableButDeclaresBounded() throws Exception {
+    @BoundedPerElement
+    class SomeFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(ProcessContext context) {}
+    }
+
+    thrown.expectMessage("Non-splittable, but annotated as @Bounded");
+    DoFnSignatures.INSTANCE.getOrParseSignature(SomeFn.class);
+  }
+
+  @Test
+  public void testUnsplittableButDeclaresUnbounded() throws Exception {
+    @UnboundedPerElement
+    class SomeFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(ProcessContext context) {}
+    }
+
+    thrown.expectMessage("Non-splittable, but annotated as @Unbounded");
+    DoFnSignatures.INSTANCE.getOrParseSignature(SomeFn.class);
+  }
+
+  /** Tests a splittable {@link DoFn} that defines all methods in their full form, correctly. */
+  @Test
+  public void testSplittableWithAllFunctions() throws Exception {
+    class GoodSplittableDoFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public ProcessContinuation processElement(
+          ProcessContext context, SomeRestrictionTracker tracker) {
+        return null;
+      }
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(Integer element) {
+        return null;
+      }
+
+      @SplitRestriction
+      public void splitRestriction(
+          Integer element, SomeRestriction restriction, OutputReceiver<SomeRestriction> receiver) {}
+
+      @NewTracker
+      public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+        return null;
+      }
+
+      @GetRestrictionCoder
+      public SomeRestrictionCoder getRestrictionCoder() {
+        return null;
+      }
+    }
+
+    DoFnSignature signature = DoFnSignatures.INSTANCE.getOrParseSignature(GoodSplittableDoFn.class);
+    assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType());
+    assertTrue(signature.processElement().isSplittable());
+    assertTrue(signature.processElement().hasReturnValue());
+    assertEquals(
+        SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType());
+    assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType());
+    assertEquals(SomeRestrictionTracker.class, signature.newTracker().trackerT().getRawType());
+    assertEquals(SomeRestriction.class, signature.newTracker().restrictionT().getRawType());
+    assertEquals(SomeRestrictionCoder.class, signature.getRestrictionCoder().coderT().getRawType());
+  }
+
+  /**
+   * Tests a splittable {@link DoFn} that defines all methods in their full form, correctly, using
+   * generic types.
+   */
+  @Test
+  public void testSplittableWithAllFunctionsGeneric() throws Exception {
+    class GoodGenericSplittableDoFn<RestrictionT, TrackerT, CoderT> extends DoFn<Integer, String> {
+      @ProcessElement
+      public ProcessContinuation processElement(ProcessContext context, TrackerT tracker) {
+        return null;
+      }
+
+      @GetInitialRestriction
+      public RestrictionT getInitialRestriction(Integer element) {
+        return null;
+      }
+
+      @SplitRestriction
+      public void splitRestriction(
+          Integer element, RestrictionT restriction, OutputReceiver<RestrictionT> receiver) {}
+
+      @NewTracker
+      public TrackerT newTracker(RestrictionT restriction) {
+        return null;
+      }
+
+      @GetRestrictionCoder
+      public CoderT getRestrictionCoder() {
+        return null;
+      }
+    }
+
+    DoFnSignature signature =
+        DoFnSignatures.INSTANCE.getOrParseSignature(
+            new GoodGenericSplittableDoFn<
+                SomeRestriction, SomeRestrictionTracker, SomeRestrictionCoder>() {}.getClass());
+    assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType());
+    assertTrue(signature.processElement().isSplittable());
+    assertTrue(signature.processElement().hasReturnValue());
+    assertEquals(
+        SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType());
+    assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType());
+    assertEquals(SomeRestrictionTracker.class, signature.newTracker().trackerT().getRawType());
+    assertEquals(SomeRestriction.class, signature.newTracker().restrictionT().getRawType());
+    assertEquals(SomeRestrictionCoder.class, signature.getRestrictionCoder().coderT().getRawType());
+  }
+
+  @Test
+  public void testSplittableMissingRequiredMethods() throws Exception {
+    class BadFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(ProcessContext context, SomeRestrictionTracker tracker) {}
+    }
+
+    thrown.expectMessage(
+        "Splittable, but does not define the following required methods: "
+            + "[@GetInitialRestriction, @NewTracker]");
+    DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class);
+  }
+
+  @Test
+  public void testNewTrackerReturnsWrongType() throws Exception {
+    class BadFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(ProcessContext context, SomeRestrictionTracker tracker) {}
+
+      @NewTracker
+      public void newTracker(SomeRestriction restriction) {}
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(Integer element) {
+        return null;
+      }
+    }
+
+    thrown.expectMessage(
+        "Returns void, but must return a subtype of RestrictionTracker<SomeRestriction>");
+    DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class);
+  }
+
+  @Test
+  public void testGetInitialRestrictionMismatchesNewTracker() throws Exception {
+    class BadFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(ProcessContext context, SomeRestrictionTracker tracker) {}
+
+      @NewTracker
+      public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+        return null;
+      }
+
+      @GetInitialRestriction
+      public String getInitialRestriction(Integer element) {
+        return null;
+      }
+    }
+
+    thrown.expectMessage(
+        "getInitialRestriction(Integer): Uses restriction type String, but @NewTracker method");
+    thrown.expectMessage("newTracker(SomeRestriction) uses restriction type SomeRestriction");
+    DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class);
+  }
+
+  @Test
+  public void testGetRestrictionCoderReturnsWrongType() throws Exception {
+    class BadFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(ProcessContext context, SomeRestrictionTracker tracker) {}
+
+      @NewTracker
+      public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+        return null;
+      }
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(Integer element) {
+        return null;
+      }
+
+      @GetRestrictionCoder
+      public KvCoder getRestrictionCoder() {
+        return null;
+      }
+    }
+
+    thrown.expectMessage(
+        "getRestrictionCoder() returns KvCoder which is not a subtype of Coder<SomeRestriction>");
+    DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class);
+  }
+
+  @Test
+  public void testSplitRestrictionReturnsWrongType() throws Exception {
+    thrown.expectMessage(
+        "Third argument must be OutputReceiver<SomeRestriction>, but is OutputReceiver<String>");
+    DoFnSignatures.analyzeSplitRestrictionMethod(
+        errors(),
+        TypeToken.of(FakeDoFn.class),
+        new AnonymousMethod() {
+          void method(
+              Integer element, SomeRestriction restriction, DoFn.OutputReceiver<String> receiver) {}
+        }.getMethod(),
+        TypeToken.of(Integer.class));
+  }
+
+  @Test
+  public void testSplitRestrictionWrongElementArgument() throws Exception {
+    class BadFn {
+      private List<SomeRestriction> splitRestriction(String element, SomeRestriction restriction) {
+        return null;
+      }
+    }
+
+    thrown.expectMessage("First argument must be the element type Integer");
+    DoFnSignatures.analyzeSplitRestrictionMethod(
+        errors(),
+        TypeToken.of(FakeDoFn.class),
+        new AnonymousMethod() {
+          void method(
+              String element,
+              SomeRestriction restriction,
+              DoFn.OutputReceiver<SomeRestriction> receiver) {}
+        }.getMethod(),
+        TypeToken.of(Integer.class));
+  }
+
+  @Test
+  public void testSplitRestrictionWrongNumArguments() throws Exception {
+    thrown.expectMessage("Must have exactly 3 arguments");
+    DoFnSignatures.analyzeSplitRestrictionMethod(
+        errors(),
+        TypeToken.of(FakeDoFn.class),
+        new AnonymousMethod() {
+          private void method(
+              Integer element,
+              SomeRestriction restriction,
+              DoFn.OutputReceiver<SomeRestriction> receiver,
+              Object extra) {}
+        }.getMethod(),
+        TypeToken.of(Integer.class));
+  }
+
+  @Test
+  public void testSplitRestrictionConsistentButWrongType() throws Exception {
+    class OtherRestriction {}
+
+    class BadFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(ProcessContext context, SomeRestrictionTracker tracker) {}
+
+      @NewTracker
+      public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+        return null;
+      }
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(Integer element) {
+        return null;
+      }
+
+      @DoFn.SplitRestriction
+      public void splitRestriction(
+          Integer element,
+          OtherRestriction restriction,
+          OutputReceiver<OtherRestriction> receiver) {}
+    }
+
+    thrown.expectMessage(
+        "getInitialRestriction(Integer): Uses restriction type SomeRestriction, "
+            + "but @SplitRestriction method ");
+    thrown.expectMessage(
+        "splitRestriction(Integer, OtherRestriction, OutputReceiver) "
+            + "uses restriction type OtherRestriction");
+    DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class);
+  }
+
+  @Test
+  public void testUnsplittableMustNotDefineExtraMethods() throws Exception {
+    class BadFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void processElement(ProcessContext context) {}
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(Integer element) {
+        return null;
+      }
+
+      @SplitRestriction
+      public void splitRestriction(
+          Integer element, SomeRestriction restriction, OutputReceiver<SomeRestriction> receiver) {}
+
+      @NewTracker
+      public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+        return null;
+      }
+
+      @GetRestrictionCoder
+      public SomeRestrictionCoder getRestrictionCoder() {
+        return null;
+      }
+    }
+
+    thrown.expectMessage(
+        "Non-splittable, but defines methods: "
+            + "[@GetInitialRestriction, @SplitRestriction, @NewTracker, @GetRestrictionCoder]");
+    DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class);
+  }
+
+  @Test
+  public void testNewTrackerWrongNumArguments() throws Exception {
+    thrown.expectMessage("Must have a single argument");
+    DoFnSignatures.analyzeNewTrackerMethod(
+        errors(),
+        TypeToken.of(FakeDoFn.class),
+        new AnonymousMethod() {
+          private SomeRestrictionTracker method(SomeRestriction restriction, Object extra) {
+            return null;
+          }
+        }.getMethod());
+  }
+
+  @Test
+  public void testNewTrackerInconsistent() throws Exception {
+    thrown.expectMessage(
+        "Returns SomeRestrictionTracker, but must return a subtype of RestrictionTracker<String>");
+    DoFnSignatures.analyzeNewTrackerMethod(
+        errors(),
+        TypeToken.of(FakeDoFn.class),
+        new AnonymousMethod() {
+          private SomeRestrictionTracker method(String restriction) {
+            return null;
+          }
+        }.getMethod());
+  }
+}



[2/4] incubator-beam git commit: [BEAM-65] SplittableDoFn prototype.

Posted by bc...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index 3eee74a..f671a67 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowingInternals;
@@ -37,8 +38,8 @@ import org.joda.time.Instant;
 /**
  * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}.
  *
- * @deprecated This class will go away when we start running {@link DoFn}'s directly (using
- * {@link DoFnInvoker}) rather than via {@link OldDoFn}.
+ * @deprecated This class will go away when we start running {@link DoFn}'s directly (using {@link
+ *     DoFnInvoker}) rather than via {@link OldDoFn}.
  */
 @Deprecated
 public class DoFnAdapters {
@@ -176,6 +177,18 @@ public class DoFnAdapters {
   }
 
   /**
+   * If the fn was created using {@link #toOldDoFn}, returns the original {@link DoFn}. Otherwise,
+   * returns {@code null}.
+   */
+  public static <InputT, OutputT> DoFn<InputT, OutputT> getDoFn(OldDoFn<InputT, OutputT> fn) {
+    if (fn instanceof SimpleDoFnAdapter) {
+      return ((SimpleDoFnAdapter<InputT, OutputT>) fn).fn;
+    } else {
+      return null;
+    }
+  }
+
+  /**
    * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link
    * OldDoFn}.
    */
@@ -324,6 +337,11 @@ public class DoFnAdapters {
     public DoFn.OutputReceiver<OutputT> outputReceiver() {
       throw new UnsupportedOperationException("outputReceiver() exists only for testing");
     }
+
+    @Override
+    public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
+      throw new UnsupportedOperationException("This is a non-splittable DoFn");
+    }
   }
 
   /**
@@ -412,5 +430,10 @@ public class DoFnAdapters {
     public DoFn.OutputReceiver<OutputT> outputReceiver() {
       throw new UnsupportedOperationException("outputReceiver() exists only for testing");
     }
+
+    @Override
+    public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
+      throw new UnsupportedOperationException("This is a non-splittable DoFn");
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 11a4cbd..302bb02 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -46,7 +46,9 @@ import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.InMemoryStateInternals;
+import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.TimerCallback;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
@@ -222,8 +224,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     if (state == State.UNINITIALIZED) {
       initializeState();
     }
-    TestContext<InputT, OutputT> context = createContext(fn);
+    TestContext context = createContext(fn);
     context.setupDelegateAggregators();
+    // State and timer internals are per-bundle.
+    stateInternals = InMemoryStateInternals.forKey(new Object());
+    timerInternals = new InMemoryTimerInternals();
     try {
       fn.startBundle(context);
     } catch (UserCodeException e) {
@@ -460,6 +465,35 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     return extractAggregatorValue(agg.getName(), agg.getCombineFn());
   }
 
+  private static TimerCallback collectInto(final List<TimerInternals.TimerData> firedTimers) {
+    return new TimerCallback() {
+      @Override
+      public void onTimer(TimerInternals.TimerData timer) throws Exception {
+        firedTimers.add(timer);
+      }
+    };
+  }
+
+  public List<TimerInternals.TimerData> advanceInputWatermark(Instant newWatermark) {
+    try {
+      final List<TimerInternals.TimerData> firedTimers = new ArrayList<>();
+      timerInternals.advanceInputWatermark(collectInto(firedTimers), newWatermark);
+      return firedTimers;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public List<TimerInternals.TimerData> advanceProcessingTime(Instant newProcessingTime) {
+    try {
+      final List<TimerInternals.TimerData> firedTimers = new ArrayList<>();
+      timerInternals.advanceProcessingTime(collectInto(firedTimers), newProcessingTime);
+      return firedTimers;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   private <AccumT, AggregateT> AggregateT extractAggregatorValue(
       String name, CombineFn<?, AccumT, AggregateT> combiner) {
     @SuppressWarnings("unchecked")
@@ -476,41 +510,27 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     return MoreObjects.firstNonNull(elems, Collections.<WindowedValue<T>>emptyList());
   }
 
-  private TestContext<InputT, OutputT> createContext(OldDoFn<InputT, OutputT> fn) {
-    return new TestContext<>(fn, options, mainOutputTag, outputs, accumulators);
+  private TestContext createContext(OldDoFn<InputT, OutputT> fn) {
+    return new TestContext();
   }
 
-  private static class TestContext<InT, OutT> extends OldDoFn<InT, OutT>.Context {
-    private final PipelineOptions opts;
-    private final TupleTag<OutT> mainOutputTag;
-    private final Map<TupleTag<?>, List<WindowedValue<?>>> outputs;
-    private final Map<String, Object> accumulators;
-
-    public TestContext(
-        OldDoFn<InT, OutT> fn,
-        PipelineOptions opts,
-        TupleTag<OutT> mainOutputTag,
-        Map<TupleTag<?>, List<WindowedValue<?>>> outputs,
-        Map<String, Object> accumulators) {
+  private class TestContext extends OldDoFn<InputT, OutputT>.Context {
+    TestContext() {
       fn.super();
-      this.opts = opts;
-      this.mainOutputTag = mainOutputTag;
-      this.outputs = outputs;
-      this.accumulators = accumulators;
     }
 
     @Override
     public PipelineOptions getPipelineOptions() {
-      return opts;
+      return options;
     }
 
     @Override
-    public void output(OutT output) {
+    public void output(OutputT output) {
       sideOutput(mainOutputTag, output);
     }
 
     @Override
-    public void outputWithTimestamp(OutT output, Instant timestamp) {
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
       sideOutputWithTimestamp(mainOutputTag, output, timestamp);
     }
 
@@ -570,40 +590,27 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     }
   }
 
-  private TestProcessContext<InputT, OutputT> createProcessContext(
+  private TestProcessContext createProcessContext(
       OldDoFn<InputT, OutputT> fn,
       TimestampedValue<InputT> elem) {
     WindowedValue<InputT> windowedValue = WindowedValue.timestampedValueInGlobalWindow(
         elem.getValue(), elem.getTimestamp());
 
-    return new TestProcessContext<>(fn,
-        createContext(fn),
-        windowedValue,
-        mainOutputTag,
-        sideInputs);
-  }
-
-  private static class TestProcessContext<InT, OutT> extends OldDoFn<InT, OutT>.ProcessContext {
-    private final TestContext<InT, OutT> context;
-    private final TupleTag<OutT> mainOutputTag;
-    private final WindowedValue<InT> element;
-    private final Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs;
-
-    private TestProcessContext(
-        OldDoFn<InT, OutT> fn,
-        TestContext<InT, OutT> context,
-        WindowedValue<InT> element,
-        TupleTag<OutT> mainOutputTag,
-        Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs) {
+    return new TestProcessContext(windowedValue);
+  }
+
+  private class TestProcessContext extends OldDoFn<InputT, OutputT>.ProcessContext {
+    private final TestContext context;
+    private final WindowedValue<InputT> element;
+
+    private TestProcessContext(WindowedValue<InputT> element) {
       fn.super();
-      this.context = context;
+      this.context = createContext(fn);
       this.element = element;
-      this.mainOutputTag = mainOutputTag;
-      this.sideInputs = sideInputs;
     }
 
     @Override
-    public InT element() {
+    public InputT element() {
       return element.getValue();
     }
 
@@ -638,10 +645,8 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     }
 
     @Override
-    public WindowingInternals<InT, OutT> windowingInternals() {
-      return new WindowingInternals<InT, OutT>() {
-        StateInternals<?> stateInternals = InMemoryStateInternals.forKey(new Object());
-
+    public WindowingInternals<InputT, OutputT> windowingInternals() {
+      return new WindowingInternals<InputT, OutputT>() {
         @Override
         public StateInternals<?> stateInternals() {
           return stateInternals;
@@ -649,7 +654,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
 
         @Override
         public void outputWindowedValue(
-            OutT output,
+            OutputT output,
             Instant timestamp,
             Collection<? extends BoundedWindow> windows,
             PaneInfo pane) {
@@ -658,8 +663,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
 
         @Override
         public TimerInternals timerInternals() {
-          throw
-              new UnsupportedOperationException("Timer Internals are not supported in DoFnTester");
+          return timerInternals;
         }
 
         @Override
@@ -695,12 +699,12 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     }
 
     @Override
-    public void output(OutT output) {
+    public void output(OutputT output) {
       sideOutput(mainOutputTag, output);
     }
 
     @Override
-    public void outputWithTimestamp(OutT output, Instant timestamp) {
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
       sideOutputWithTimestamp(mainOutputTag, output, timestamp);
     }
 
@@ -774,6 +778,9 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
   /** The outputs from the {@link DoFn} under test. */
   private Map<TupleTag<?>, List<WindowedValue<?>>> outputs;
 
+  private InMemoryStateInternals<?> stateInternals;
+  private InMemoryTimerInternals timerInternals;
+
   /** The state of processing of the {@link DoFn} under test. */
   private State state = State.UNINITIALIZED;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 2443d8e..fdef908 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.google.common.collect.ImmutableList;
 import java.io.Serializable;
 import java.util.Arrays;
@@ -27,6 +29,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.StringUtils;
@@ -716,6 +719,8 @@ public class ParDo {
 
     @Override
     public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
+      checkArgument(
+          !isSplittable(fn), "Splittable DoFn not supported by the current runner");
       return PCollection.<OutputT>createPrimitiveOutputInternal(
               input.getPipeline(),
               input.getWindowingStrategy(),
@@ -925,6 +930,9 @@ public class ParDo {
 
     @Override
     public PCollectionTuple apply(PCollection<? extends InputT> input) {
+      checkArgument(
+          !isSplittable(fn), "Splittable DoFn not supported by the current runner");
+
       PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal(
           input.getPipeline(),
           TupleTagList.of(mainOutputTag).and(sideOutputTags.getAll()),
@@ -997,4 +1005,15 @@ public class ParDo {
         .add(DisplayData.item("fn", fnClass)
             .withLabel("Transform Function"));
   }
+
+  private static boolean isSplittable(OldDoFn<?, ?> oldDoFn) {
+    DoFn<?, ?> fn = DoFnAdapters.getDoFn(oldDoFn);
+    if (fn == null) {
+      return false;
+    }
+    return DoFnSignatures.INSTANCE
+        .getOrParseSignature(fn.getClass())
+        .processElement()
+        .isSplittable();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index eb6961c..9672d53 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -17,7 +17,10 @@
  */
 package org.apache.beam.sdk.transforms.reflect;
 
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 
 /**
  * Interface for invoking the {@code DoFn} processing methods.
@@ -43,7 +46,28 @@ public interface DoFnInvoker<InputT, OutputT> {
    *
    * @param c The {@link DoFn.ProcessContext} to invoke the fn with.
    * @param extra Factory for producing extra parameter objects (such as window), if necessary.
+   * @return The {@link DoFn.ProcessContinuation} returned by the underlying method, or {@link
+   *     DoFn.ProcessContinuation#stop()} if it returns {@code void}.
    */
-  void invokeProcessElement(
+  DoFn.ProcessContinuation invokeProcessElement(
       DoFn<InputT, OutputT>.ProcessContext c, DoFn.ExtraContextFactory<InputT, OutputT> extra);
+
+  /** Invoke the {@link DoFn.GetInitialRestriction} method on the bound {@link DoFn}. */
+  <RestrictionT> RestrictionT invokeGetInitialRestriction(InputT element);
+
+  /**
+   * Invoke the {@link DoFn.GetRestrictionCoder} method on the bound {@link DoFn}. Called only
+   * during pipeline construction time.
+   */
+  <RestrictionT> Coder<RestrictionT> invokeGetRestrictionCoder(CoderRegistry coderRegistry);
+
+  /** Invoke the {@link DoFn.SplitRestriction} method on the bound {@link DoFn}. */
+  <RestrictionT> void invokeSplitRestriction(
+      InputT element,
+      RestrictionT restriction,
+      DoFn.OutputReceiver<RestrictionT> restrictionReceiver);
+
+  /** Invoke the {@link DoFn.NewTracker} method on the bound {@link DoFn}. */
+  <RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> TrackerT invokeNewTracker(
+      RestrictionT restriction);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index da88587..fd057c3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms.reflect;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.common.reflect.TypeToken;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -26,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import net.bytebuddy.ByteBuddy;
@@ -35,10 +37,12 @@ import net.bytebuddy.description.method.MethodDescription;
 import net.bytebuddy.description.modifier.FieldManifestation;
 import net.bytebuddy.description.modifier.Visibility;
 import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.description.type.TypeList;
 import net.bytebuddy.dynamic.DynamicType;
 import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.dynamic.scaffold.InstrumentedType;
 import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy;
+import net.bytebuddy.implementation.ExceptionMethod;
 import net.bytebuddy.implementation.FixedValue;
 import net.bytebuddy.implementation.Implementation;
 import net.bytebuddy.implementation.Implementation.Context;
@@ -48,6 +52,7 @@ import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
 import net.bytebuddy.implementation.bytecode.StackManipulation;
 import net.bytebuddy.implementation.bytecode.Throw;
 import net.bytebuddy.implementation.bytecode.assign.Assigner;
+import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
 import net.bytebuddy.implementation.bytecode.member.FieldAccess;
 import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
 import net.bytebuddy.implementation.bytecode.member.MethodReturn;
@@ -57,12 +62,17 @@ import net.bytebuddy.jar.asm.MethodVisitor;
 import net.bytebuddy.jar.asm.Opcodes;
 import net.bytebuddy.jar.asm.Type;
 import net.bytebuddy.matcher.ElementMatchers;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
 import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
 import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.values.TypeDescriptor;
 
 /** Dynamically generates {@link DoFnInvoker} instances for invoking a {@link DoFn}. */
 public class DoFnInvokers {
@@ -81,10 +91,10 @@ public class DoFnInvokers {
   private DoFnInvokers() {}
 
   /**
-   * Creates a {@link DoFnInvoker} for the given {@link Object}, which should be either a
-   * {@link DoFn} or an {@link OldDoFn}. The expected use would be to deserialize a user's
-   * function as an {@link Object} and then pass it to this method, so there is no need to
-   * statically specify what sort of object it is.
+   * Creates a {@link DoFnInvoker} for the given {@link Object}, which should be either a {@link
+   * DoFn} or an {@link OldDoFn}. The expected use would be to deserialize a user's function as an
+   * {@link Object} and then pass it to this method, so there is no need to statically specify what
+   * sort of object it is.
    *
    * @deprecated this is to be used only as a migration path for decoupling upgrades
    */
@@ -92,15 +102,16 @@ public class DoFnInvokers {
   public DoFnInvoker<?, ?> invokerFor(Object deserializedFn) {
     if (deserializedFn instanceof DoFn) {
       return newByteBuddyInvoker((DoFn<?, ?>) deserializedFn);
-    } else if (deserializedFn instanceof OldDoFn){
+    } else if (deserializedFn instanceof OldDoFn) {
       return new OldDoFnInvoker<>((OldDoFn<?, ?>) deserializedFn);
     } else {
-      throw new IllegalArgumentException(String.format(
-          "Cannot create a %s for %s; it should be either a %s or an %s.",
-          DoFnInvoker.class.getSimpleName(),
-          deserializedFn.toString(),
-          DoFn.class.getSimpleName(),
-          OldDoFn.class.getSimpleName()));
+      throw new IllegalArgumentException(
+          String.format(
+              "Cannot create a %s for %s; it should be either a %s or an %s.",
+              DoFnInvoker.class.getSimpleName(),
+              deserializedFn.toString(),
+              DoFn.class.getSimpleName(),
+              OldDoFn.class.getSimpleName()));
     }
   }
 
@@ -113,12 +124,13 @@ public class DoFnInvokers {
     }
 
     @Override
-    public void invokeProcessElement(
+    public DoFn.ProcessContinuation invokeProcessElement(
         DoFn<InputT, OutputT>.ProcessContext c, ExtraContextFactory<InputT, OutputT> extra) {
       OldDoFn<InputT, OutputT>.ProcessContext oldCtx =
           DoFnAdapters.adaptProcessContext(fn, c, extra);
       try {
         fn.processElement(oldCtx);
+        return DoFn.ProcessContinuation.stop();
       } catch (Throwable exc) {
         throw UserCodeException.wrap(exc);
       }
@@ -161,14 +173,37 @@ public class DoFnInvokers {
         throw UserCodeException.wrap(exc);
       }
     }
+
+    @Override
+    public <RestrictionT> RestrictionT invokeGetInitialRestriction(InputT element) {
+      throw new UnsupportedOperationException("OldDoFn is not splittable");
+    }
+
+    @Override
+    public <RestrictionT> Coder<RestrictionT> invokeGetRestrictionCoder(
+        CoderRegistry coderRegistry) {
+      throw new UnsupportedOperationException("OldDoFn is not splittable");
+    }
+
+    @Override
+    public <RestrictionT> void invokeSplitRestriction(
+        InputT element, RestrictionT restriction, DoFn.OutputReceiver<RestrictionT> receiver) {
+      throw new UnsupportedOperationException("OldDoFn is not splittable");
+    }
+
+    @Override
+    public <RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
+        TrackerT invokeNewTracker(RestrictionT restriction) {
+      throw new UnsupportedOperationException("OldDoFn is not splittable");
+    }
   }
 
   /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */
   @SuppressWarnings({"unchecked", "rawtypes"})
   public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
       DoFn<InputT, OutputT> fn) {
-    return newByteBuddyInvoker(DoFnSignatures.INSTANCE.getOrParseSignature(
-        (Class) fn.getClass()), fn);
+    return newByteBuddyInvoker(
+        DoFnSignatures.INSTANCE.getOrParseSignature((Class) fn.getClass()), fn);
   }
 
   /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */
@@ -214,6 +249,32 @@ public class DoFnInvokers {
     return constructor;
   }
 
+  /** Default implementation of {@link DoFn.SplitRestriction}, for delegation by bytebuddy. */
+  public static class DefaultSplitRestriction {
+    /** Doesn't split the restriction. */
+    @SuppressWarnings("unused")
+    public static <InputT, RestrictionT> void invokeSplitRestriction(
+        InputT element, RestrictionT restriction, DoFn.OutputReceiver<RestrictionT> receiver) {
+      receiver.output(restriction);
+    }
+  }
+
+  /** Default implementation of {@link DoFn.GetRestrictionCoder}, for delegation by bytebuddy. */
+  public static class DefaultRestrictionCoder {
+    private final TypeToken<?> restrictionType;
+
+    DefaultRestrictionCoder(TypeToken<?> restrictionType) {
+      this.restrictionType = restrictionType;
+    }
+
+    /** Doesn't split the restriction. */
+    @SuppressWarnings({"unused", "unchecked"})
+    public <RestrictionT> Coder<RestrictionT> invokeGetRestrictionCoder(CoderRegistry registry)
+        throws CannotProvideCoderException {
+      return (Coder) registry.getCoder(TypeDescriptor.of(restrictionType.getType()));
+    }
+  }
+
   /** Generates a {@link DoFnInvoker} class for the given {@link DoFnSignature}. */
   private static Class<? extends DoFnInvoker<?, ?>> generateInvokerClass(DoFnSignature signature) {
     Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
@@ -247,7 +308,15 @@ public class DoFnInvokers {
             .method(ElementMatchers.named("invokeSetup"))
             .intercept(delegateOrNoop(signature.setup()))
             .method(ElementMatchers.named("invokeTeardown"))
-            .intercept(delegateOrNoop(signature.teardown()));
+            .intercept(delegateOrNoop(signature.teardown()))
+            .method(ElementMatchers.named("invokeGetInitialRestriction"))
+            .intercept(delegateWithDowncastOrThrow(signature.getInitialRestriction()))
+            .method(ElementMatchers.named("invokeSplitRestriction"))
+            .intercept(splitRestrictionDelegation(signature))
+            .method(ElementMatchers.named("invokeGetRestrictionCoder"))
+            .intercept(getRestrictionCoderDelegation(signature))
+            .method(ElementMatchers.named("invokeNewTracker"))
+            .intercept(delegateWithDowncastOrThrow(signature.newTracker()));
 
     DynamicType.Unloaded<?> unloaded = builder.make();
 
@@ -260,6 +329,28 @@ public class DoFnInvokers {
     return res;
   }
 
+  private static Implementation getRestrictionCoderDelegation(DoFnSignature signature) {
+    if (signature.processElement().isSplittable()) {
+      if (signature.getRestrictionCoder() == null) {
+        return MethodDelegation.to(
+            new DefaultRestrictionCoder(signature.getInitialRestriction().restrictionT()));
+      } else {
+        return new DowncastingParametersMethodDelegation(
+            signature.getRestrictionCoder().targetMethod());
+      }
+    } else {
+      return ExceptionMethod.throwing(UnsupportedOperationException.class);
+    }
+  }
+
+  private static Implementation splitRestrictionDelegation(DoFnSignature signature) {
+    if (signature.splitRestriction() == null) {
+      return MethodDelegation.to(DefaultSplitRestriction.class);
+    } else {
+      return new DowncastingParametersMethodDelegation(signature.splitRestriction().targetMethod());
+    }
+  }
+
   /** Delegates to the given method if available, or does nothing. */
   private static Implementation delegateOrNoop(DoFnSignature.DoFnMethod method) {
     return (method == null)
@@ -267,6 +358,13 @@ public class DoFnInvokers {
         : new DoFnMethodDelegation(method.targetMethod());
   }
 
+  /** Delegates to the given method if available, or throws UnsupportedOperationException. */
+  private static Implementation delegateWithDowncastOrThrow(DoFnSignature.DoFnMethod method) {
+    return (method == null)
+        ? ExceptionMethod.throwing(UnsupportedOperationException.class)
+        : new DowncastingParametersMethodDelegation(method.targetMethod());
+  }
+
   /**
    * Implements a method of {@link DoFnInvoker} (the "instrumented method") by delegating to a
    * "target method" of the wrapped {@link DoFn}.
@@ -374,12 +472,37 @@ public class DoFnInvokers {
   }
 
   /**
+   * Passes parameters to the delegated method by downcasting each parameter of non-primitive type
+   * to its expected type.
+   */
+  private static class DowncastingParametersMethodDelegation extends DoFnMethodDelegation {
+    DowncastingParametersMethodDelegation(Method method) {
+      super(method);
+    }
+
+    @Override
+    protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) {
+      List<StackManipulation> pushParameters = new ArrayList<>();
+      TypeList.Generic paramTypes = targetMethod.getParameters().asTypeList();
+      for (int i = 0; i < paramTypes.size(); i++) {
+        TypeDescription.Generic paramT = paramTypes.get(i);
+        pushParameters.add(MethodVariableAccess.of(paramT).loadOffset(i + 1));
+        if (!paramT.isPrimitive()) {
+          pushParameters.add(TypeCasting.to(paramT));
+        }
+      }
+      return new StackManipulation.Compound(pushParameters);
+    }
+  }
+
+  /**
    * Implements the invoker's {@link DoFnInvoker#invokeProcessElement} method by delegating to the
    * {@link DoFn.ProcessElement} method.
    */
   private static final class ProcessElementDelegation extends DoFnMethodDelegation {
     private static final Map<DoFnSignature.Parameter, MethodDescription>
         EXTRA_CONTEXT_FACTORY_METHODS;
+    private static final MethodDescription PROCESS_CONTINUATION_STOP_METHOD;
 
     static {
       try {
@@ -397,11 +520,21 @@ public class DoFnInvokers {
             DoFnSignature.Parameter.OUTPUT_RECEIVER,
             new MethodDescription.ForLoadedMethod(
                 DoFn.ExtraContextFactory.class.getMethod("outputReceiver")));
+        methods.put(
+            DoFnSignature.Parameter.RESTRICTION_TRACKER,
+            new MethodDescription.ForLoadedMethod(
+                DoFn.ExtraContextFactory.class.getMethod("restrictionTracker")));
         EXTRA_CONTEXT_FACTORY_METHODS = Collections.unmodifiableMap(methods);
       } catch (Exception e) {
         throw new RuntimeException(
             "Failed to locate an ExtraContextFactory method that was expected to exist", e);
       }
+      try {
+        PROCESS_CONTINUATION_STOP_METHOD =
+            new MethodDescription.ForLoadedMethod(DoFn.ProcessContinuation.class.getMethod("stop"));
+      } catch (NoSuchMethodException e) {
+        throw new RuntimeException("Failed to locate ProcessContinuation.stop()");
+      }
     }
 
     private final DoFnSignature.ProcessElementMethod signature;
@@ -427,14 +560,26 @@ public class DoFnInvokers {
         parameters.add(
             new StackManipulation.Compound(
                 pushExtraContextFactory,
-                MethodInvocation.invoke(EXTRA_CONTEXT_FACTORY_METHODS.get(param))));
+                MethodInvocation.invoke(EXTRA_CONTEXT_FACTORY_METHODS.get(param)),
+                // ExtraContextFactory.restrictionTracker() returns a RestrictionTracker,
+                // but the @ProcessElement method expects a concrete subtype of it.
+                // Insert a downcast.
+                (param == DoFnSignature.Parameter.RESTRICTION_TRACKER)
+                    ? TypeCasting.to(
+                        new TypeDescription.ForLoadedType(signature.trackerT().getRawType()))
+                    : StackManipulation.Trivial.INSTANCE));
       }
       return new StackManipulation.Compound(parameters);
     }
 
     @Override
     protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) {
-      return MethodReturn.VOID;
+      if (TypeDescription.VOID.equals(targetMethod.getReturnType().asErasure())) {
+        return new StackManipulation.Compound(
+            MethodInvocation.invoke(PROCESS_CONTINUATION_STOP_METHOD), MethodReturn.REFERENCE);
+      } else {
+        return MethodReturn.returning(targetMethod.getReturnType().asErasure());
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 756df07..632f817 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -18,11 +18,16 @@
 package org.apache.beam.sdk.transforms.reflect;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.reflect.TypeToken;
 import java.lang.reflect.Method;
 import java.util.Collections;
 import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.values.PCollection;
 
 /**
  * Describes the signature of a {@link DoFn}, in particular, which features it uses, which extra
@@ -35,6 +40,9 @@ public abstract class DoFnSignature {
   /** Class of the original {@link DoFn} from which this signature was produced. */
   public abstract Class<? extends DoFn<?, ?>> fnClass();
 
+  /** Whether this {@link DoFn} does a bounded amount of work per element. */
+  public abstract PCollection.IsBounded isBoundedPerElement();
+
   /** Details about this {@link DoFn}'s {@link DoFn.ProcessElement} method. */
   public abstract ProcessElementMethod processElement();
 
@@ -54,6 +62,22 @@ public abstract class DoFnSignature {
   @Nullable
   public abstract LifecycleMethod teardown();
 
+  /** Details about this {@link DoFn}'s {@link DoFn.GetInitialRestriction} method. */
+  @Nullable
+  public abstract GetInitialRestrictionMethod getInitialRestriction();
+
+  /** Details about this {@link DoFn}'s {@link DoFn.SplitRestriction} method. */
+  @Nullable
+  public abstract SplitRestrictionMethod splitRestriction();
+
+  /** Details about this {@link DoFn}'s {@link DoFn.GetRestrictionCoder} method. */
+  @Nullable
+  public abstract GetRestrictionCoderMethod getRestrictionCoder();
+
+  /** Details about this {@link DoFn}'s {@link DoFn.NewTracker} method. */
+  @Nullable
+  public abstract NewTrackerMethod newTracker();
+
   static Builder builder() {
     return new AutoValue_DoFnSignature.Builder();
   }
@@ -61,11 +85,16 @@ public abstract class DoFnSignature {
   @AutoValue.Builder
   abstract static class Builder {
     abstract Builder setFnClass(Class<? extends DoFn<?, ?>> fnClass);
+    abstract Builder setIsBoundedPerElement(PCollection.IsBounded isBounded);
     abstract Builder setProcessElement(ProcessElementMethod processElement);
     abstract Builder setStartBundle(BundleMethod startBundle);
     abstract Builder setFinishBundle(BundleMethod finishBundle);
     abstract Builder setSetup(LifecycleMethod setup);
     abstract Builder setTeardown(LifecycleMethod teardown);
+    abstract Builder setGetInitialRestriction(GetInitialRestrictionMethod getInitialRestriction);
+    abstract Builder setSplitRestriction(SplitRestrictionMethod splitRestriction);
+    abstract Builder setGetRestrictionCoder(GetRestrictionCoderMethod getRestrictionCoder);
+    abstract Builder setNewTracker(NewTrackerMethod newTracker);
     abstract DoFnSignature build();
   }
 
@@ -80,6 +109,7 @@ public abstract class DoFnSignature {
     BOUNDED_WINDOW,
     INPUT_PROVIDER,
     OUTPUT_RECEIVER,
+    RESTRICTION_TRACKER
   }
 
   /** Describes a {@link DoFn.ProcessElement} method. */
@@ -92,17 +122,33 @@ public abstract class DoFnSignature {
     /** Types of optional parameters of the annotated method, in the order they appear. */
     public abstract List<Parameter> extraParameters();
 
+    /** Concrete type of the {@link RestrictionTracker} parameter, if present. */
+    @Nullable
+    abstract TypeToken<?> trackerT();
+
+    /** Whether this {@link DoFn} returns a {@link ProcessContinuation} or void. */
+    public abstract boolean hasReturnValue();
+
     static ProcessElementMethod create(
         Method targetMethod,
-        List<Parameter> extraParameters) {
+        List<Parameter> extraParameters,
+        TypeToken<?> trackerT,
+        boolean hasReturnValue) {
       return new AutoValue_DoFnSignature_ProcessElementMethod(
-          targetMethod, Collections.unmodifiableList(extraParameters));
+          targetMethod, Collections.unmodifiableList(extraParameters), trackerT, hasReturnValue);
     }
 
     /** Whether this {@link DoFn} uses a Single Window. */
     public boolean usesSingleWindow() {
       return extraParameters().contains(Parameter.BOUNDED_WINDOW);
     }
+
+    /**
+     * Whether this {@link DoFn} is <a href="https://s.apache.org/splittable-do-fn">splittable</a>.
+     */
+    public boolean isSplittable() {
+      return extraParameters().contains(Parameter.RESTRICTION_TRACKER);
+    }
   }
 
   /** Describes a {@link DoFn.StartBundle} or {@link DoFn.FinishBundle} method. */
@@ -128,4 +174,68 @@ public abstract class DoFnSignature {
       return new AutoValue_DoFnSignature_LifecycleMethod(targetMethod);
     }
   }
+
+  /** Describes a {@link DoFn.GetInitialRestriction} method. */
+  @AutoValue
+  public abstract static class GetInitialRestrictionMethod implements DoFnMethod {
+    /** The annotated method itself. */
+    @Override
+    public abstract Method targetMethod();
+
+    /** Type of the returned restriction. */
+    abstract TypeToken<?> restrictionT();
+
+    static GetInitialRestrictionMethod create(Method targetMethod, TypeToken<?> restrictionT) {
+      return new AutoValue_DoFnSignature_GetInitialRestrictionMethod(targetMethod, restrictionT);
+    }
+  }
+
+  /** Describes a {@link DoFn.SplitRestriction} method. */
+  @AutoValue
+  public abstract static class SplitRestrictionMethod implements DoFnMethod {
+    /** The annotated method itself. */
+    @Override
+    public abstract Method targetMethod();
+
+    /** Type of the restriction taken and returned. */
+    abstract TypeToken<?> restrictionT();
+
+    static SplitRestrictionMethod create(Method targetMethod, TypeToken<?> restrictionT) {
+      return new AutoValue_DoFnSignature_SplitRestrictionMethod(targetMethod, restrictionT);
+    }
+  }
+
+  /** Describes a {@link DoFn.NewTracker} method. */
+  @AutoValue
+  public abstract static class NewTrackerMethod implements DoFnMethod {
+    /** The annotated method itself. */
+    @Override
+    public abstract Method targetMethod();
+
+    /** Type of the input restriction. */
+    abstract TypeToken<?> restrictionT();
+
+    /** Type of the returned {@link RestrictionTracker}. */
+    abstract TypeToken<?> trackerT();
+
+    static NewTrackerMethod create(
+        Method targetMethod, TypeToken<?> restrictionT, TypeToken<?> trackerT) {
+      return new AutoValue_DoFnSignature_NewTrackerMethod(targetMethod, restrictionT, trackerT);
+    }
+  }
+
+  /** Describes a {@link DoFn.GetRestrictionCoder} method. */
+  @AutoValue
+  public abstract static class GetRestrictionCoderMethod implements DoFnMethod {
+    /** The annotated method itself. */
+    @Override
+    public abstract Method targetMethod();
+
+    /** Type of the returned {@link Coder}. */
+    abstract TypeToken<?> coderT();
+
+    static GetRestrictionCoderMethod create(Method targetMethod, TypeToken<?> coderT) {
+      return new AutoValue_DoFnSignature_GetRestrictionCoderMethod(targetMethod, coderT);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index ad15127..524ea24 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.transforms.reflect;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.reflect.TypeParameter;
 import com.google.common.reflect.TypeToken;
@@ -34,9 +36,12 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
+import org.apache.beam.sdk.values.PCollection;
 
 /**
  * Parses a {@link DoFn} and computes its {@link DoFnSignature}. See {@link #getOrParseSignature}.
@@ -88,6 +93,14 @@ public class DoFnSignatures {
     Method setupMethod = findAnnotatedMethod(errors, DoFn.Setup.class, fnClass, false);
     Method teardownMethod = findAnnotatedMethod(errors, DoFn.Teardown.class, fnClass, false);
 
+    Method getInitialRestrictionMethod =
+        findAnnotatedMethod(errors, DoFn.GetInitialRestriction.class, fnClass, false);
+    Method splitRestrictionMethod =
+        findAnnotatedMethod(errors, DoFn.SplitRestriction.class, fnClass, false);
+    Method getRestrictionCoderMethod =
+        findAnnotatedMethod(errors, DoFn.GetRestrictionCoder.class, fnClass, false);
+    Method newTrackerMethod = findAnnotatedMethod(errors, DoFn.NewTracker.class, fnClass, false);
+
     ErrorReporter processElementErrors =
         errors.forMethod(DoFn.ProcessElement.class, processElementMethod);
     DoFnSignature.ProcessElementMethod processElement =
@@ -119,7 +132,213 @@ public class DoFnSignatures {
               errors.forMethod(DoFn.Teardown.class, teardownMethod), teardownMethod));
     }
 
-    return builder.build();
+    DoFnSignature.GetInitialRestrictionMethod getInitialRestriction = null;
+    ErrorReporter getInitialRestrictionErrors = null;
+    if (getInitialRestrictionMethod != null) {
+      getInitialRestrictionErrors =
+          errors.forMethod(DoFn.GetInitialRestriction.class, getInitialRestrictionMethod);
+      builder.setGetInitialRestriction(
+          getInitialRestriction =
+              analyzeGetInitialRestrictionMethod(
+                  getInitialRestrictionErrors, fnToken, getInitialRestrictionMethod, inputT));
+    }
+
+    DoFnSignature.SplitRestrictionMethod splitRestriction = null;
+    if (splitRestrictionMethod != null) {
+      ErrorReporter splitRestrictionErrors =
+          errors.forMethod(DoFn.SplitRestriction.class, splitRestrictionMethod);
+      builder.setSplitRestriction(
+          splitRestriction =
+              analyzeSplitRestrictionMethod(
+                  splitRestrictionErrors, fnToken, splitRestrictionMethod, inputT));
+    }
+
+    DoFnSignature.GetRestrictionCoderMethod getRestrictionCoder = null;
+    if (getRestrictionCoderMethod != null) {
+      ErrorReporter getRestrictionCoderErrors =
+          errors.forMethod(DoFn.GetRestrictionCoder.class, getRestrictionCoderMethod);
+      builder.setGetRestrictionCoder(
+          getRestrictionCoder =
+              analyzeGetRestrictionCoderMethod(
+                  getRestrictionCoderErrors, fnToken, getRestrictionCoderMethod));
+    }
+
+    DoFnSignature.NewTrackerMethod newTracker = null;
+    if (newTrackerMethod != null) {
+      ErrorReporter newTrackerErrors = errors.forMethod(DoFn.NewTracker.class, newTrackerMethod);
+      builder.setNewTracker(
+          newTracker = analyzeNewTrackerMethod(newTrackerErrors, fnToken, newTrackerMethod));
+    }
+
+    builder.setIsBoundedPerElement(inferBoundedness(fnToken, processElement, errors));
+
+    DoFnSignature signature = builder.build();
+
+    // Additional validation for splittable DoFn's.
+    if (processElement.isSplittable()) {
+      verifySplittableMethods(signature, errors);
+    } else {
+      verifyUnsplittableMethods(errors, signature);
+    }
+
+    return signature;
+  }
+
+  /**
+   * Infers the boundedness of the {@link DoFn.ProcessElement} method (whether or not it performs a
+   * bounded amount of work per element) using the following criteria:
+   *
+   * <ol>
+   * <li>If the {@link DoFn} is not splittable, then it is bounded, it must not be annotated as
+   *     {@link DoFn.BoundedPerElement} or {@link DoFn.UnboundedPerElement}, and {@link
+   *     DoFn.ProcessElement} must return {@code void}.
+   * <li>If the {@link DoFn} (or any of its supertypes) is annotated as {@link
+   *     DoFn.BoundedPerElement} or {@link DoFn.UnboundedPerElement}, use that. Only one of
+   *     these must be specified.
+   * <li>If {@link DoFn.ProcessElement} returns {@link DoFn.ProcessContinuation}, assume it is
+   *     unbounded. Otherwise (if it returns {@code void}), assume it is bounded.
+   * <li>If {@link DoFn.ProcessElement} returns {@code void}, but the {@link DoFn} is annotated
+   *     {@link DoFn.UnboundedPerElement}, this is an error.
+   * </ol>
+   */
+  private static PCollection.IsBounded inferBoundedness(
+      TypeToken<? extends DoFn> fnToken,
+      DoFnSignature.ProcessElementMethod processElement,
+      ErrorReporter errors) {
+    PCollection.IsBounded isBounded = null;
+    for (TypeToken<?> supertype : fnToken.getTypes()) {
+      if (supertype.getRawType().isAnnotationPresent(DoFn.BoundedPerElement.class)
+          || supertype.getRawType().isAnnotationPresent(DoFn.UnboundedPerElement.class)) {
+        errors.checkArgument(
+            isBounded == null,
+            "Both @%s and @%s specified",
+            DoFn.BoundedPerElement.class.getSimpleName(),
+            DoFn.UnboundedPerElement.class.getSimpleName());
+        isBounded =
+            supertype.getRawType().isAnnotationPresent(DoFn.BoundedPerElement.class)
+                ? PCollection.IsBounded.BOUNDED
+                : PCollection.IsBounded.UNBOUNDED;
+      }
+    }
+    if (processElement.isSplittable()) {
+      if (isBounded == null) {
+        isBounded =
+            processElement.hasReturnValue()
+                ? PCollection.IsBounded.UNBOUNDED
+                : PCollection.IsBounded.BOUNDED;
+      }
+    } else {
+      errors.checkArgument(
+          isBounded == null,
+          "Non-splittable, but annotated as @"
+              + ((isBounded == PCollection.IsBounded.BOUNDED)
+                  ? DoFn.BoundedPerElement.class.getSimpleName()
+                  : DoFn.UnboundedPerElement.class.getSimpleName()));
+      checkState(!processElement.hasReturnValue(), "Should have been inferred splittable");
+      isBounded = PCollection.IsBounded.BOUNDED;
+    }
+    return isBounded;
+  }
+
+  /**
+   * Verifies properties related to methods of splittable {@link DoFn}:
+   *
+   * <ul>
+   * <li>Must declare the required {@link DoFn.GetInitialRestriction} and {@link DoFn.NewTracker}
+   *     methods.
+   * <li>Types of restrictions and trackers must match exactly between {@link DoFn.ProcessElement},
+   *     {@link DoFn.GetInitialRestriction}, {@link DoFn.NewTracker}, {@link
+   *     DoFn.GetRestrictionCoder}, {@link DoFn.SplitRestriction}.
+   * </ul>
+   */
+  private static void verifySplittableMethods(DoFnSignature signature, ErrorReporter errors) {
+    DoFnSignature.ProcessElementMethod processElement = signature.processElement();
+    DoFnSignature.GetInitialRestrictionMethod getInitialRestriction =
+        signature.getInitialRestriction();
+    DoFnSignature.NewTrackerMethod newTracker = signature.newTracker();
+    DoFnSignature.GetRestrictionCoderMethod getRestrictionCoder = signature.getRestrictionCoder();
+    DoFnSignature.SplitRestrictionMethod splitRestriction = signature.splitRestriction();
+
+    ErrorReporter processElementErrors =
+        errors.forMethod(DoFn.ProcessElement.class, processElement.targetMethod());
+
+    List<String> missingRequiredMethods = new ArrayList<>();
+    if (getInitialRestriction == null) {
+      missingRequiredMethods.add("@" + DoFn.GetInitialRestriction.class.getSimpleName());
+    }
+    if (newTracker == null) {
+      missingRequiredMethods.add("@" + DoFn.NewTracker.class.getSimpleName());
+    }
+    if (!missingRequiredMethods.isEmpty()) {
+      processElementErrors.throwIllegalArgument(
+          "Splittable, but does not define the following required methods: %s",
+          missingRequiredMethods);
+    }
+
+    processElementErrors.checkArgument(
+        processElement.trackerT().equals(newTracker.trackerT()),
+        "Has tracker type %s, but @%s method %s uses tracker type %s",
+        formatType(processElement.trackerT()),
+        DoFn.NewTracker.class.getSimpleName(),
+        format(newTracker.targetMethod()),
+        formatType(newTracker.trackerT()));
+
+    ErrorReporter getInitialRestrictionErrors =
+        errors.forMethod(DoFn.GetInitialRestriction.class, getInitialRestriction.targetMethod());
+    TypeToken<?> restrictionT = getInitialRestriction.restrictionT();
+
+    getInitialRestrictionErrors.checkArgument(
+        restrictionT.equals(newTracker.restrictionT()),
+        "Uses restriction type %s, but @%s method %s uses restriction type %s",
+        formatType(restrictionT),
+        DoFn.NewTracker.class.getSimpleName(),
+        format(newTracker.targetMethod()),
+        formatType(newTracker.restrictionT()));
+
+    if (getRestrictionCoder != null) {
+      getInitialRestrictionErrors.checkArgument(
+          getRestrictionCoder.coderT().isSubtypeOf(coderTypeOf(restrictionT)),
+          "Uses restriction type %s, but @%s method %s returns %s "
+              + "which is not a subtype of %s",
+          formatType(restrictionT),
+          DoFn.GetRestrictionCoder.class.getSimpleName(),
+          format(getRestrictionCoder.targetMethod()),
+          formatType(getRestrictionCoder.coderT()),
+          formatType(coderTypeOf(restrictionT)));
+    }
+
+    if (splitRestriction != null) {
+      getInitialRestrictionErrors.checkArgument(
+          splitRestriction.restrictionT().equals(restrictionT),
+          "Uses restriction type %s, but @%s method %s uses restriction type %s",
+          formatType(restrictionT),
+          DoFn.SplitRestriction.class.getSimpleName(),
+          format(splitRestriction.targetMethod()),
+          formatType(splitRestriction.restrictionT()));
+    }
+  }
+
+  /**
+   * Verifies that a non-splittable {@link DoFn} does not declare any methods that only make sense
+   * for splittable {@link DoFn}: {@link DoFn.GetInitialRestriction}, {@link DoFn.SplitRestriction},
+   * {@link DoFn.NewTracker}, {@link DoFn.GetRestrictionCoder}.
+   */
+  private static void verifyUnsplittableMethods(ErrorReporter errors, DoFnSignature signature) {
+    List<String> forbiddenMethods = new ArrayList<>();
+    if (signature.getInitialRestriction() != null) {
+      forbiddenMethods.add("@" + DoFn.GetInitialRestriction.class.getSimpleName());
+    }
+    if (signature.splitRestriction() != null) {
+      forbiddenMethods.add("@" + DoFn.SplitRestriction.class.getSimpleName());
+    }
+    if (signature.newTracker() != null) {
+      forbiddenMethods.add("@" + DoFn.NewTracker.class.getSimpleName());
+    }
+    if (signature.getRestrictionCoder() != null) {
+      forbiddenMethods.add("@" + DoFn.GetRestrictionCoder.class.getSimpleName());
+    }
+    errors.checkArgument(
+        forbiddenMethods.isEmpty(), "Non-splittable, but defines methods: %s", forbiddenMethods);
   }
 
   /**
@@ -166,7 +385,11 @@ public class DoFnSignatures {
       Method m,
       TypeToken<?> inputT,
       TypeToken<?> outputT) {
-    errors.checkArgument(void.class.equals(m.getReturnType()), "Must return void");
+    errors.checkArgument(
+        void.class.equals(m.getReturnType())
+            || DoFn.ProcessContinuation.class.equals(m.getReturnType()),
+        "Must return void or %s",
+        DoFn.ProcessContinuation.class.getSimpleName());
 
     TypeToken<?> processContextToken = doFnProcessContextTypeOf(inputT, outputT);
 
@@ -181,6 +404,7 @@ public class DoFnSignatures {
         formatType(processContextToken));
 
     List<DoFnSignature.Parameter> extraParameters = new ArrayList<>();
+    TypeToken<?> trackerT = null;
 
     TypeToken<?> expectedInputProviderT = inputProviderTypeOf(inputT);
     TypeToken<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT);
@@ -190,38 +414,62 @@ public class DoFnSignatures {
       if (rawType.equals(BoundedWindow.class)) {
         errors.checkArgument(
             !extraParameters.contains(DoFnSignature.Parameter.BOUNDED_WINDOW),
-            "Multiple BoundedWindow parameters");
+            "Multiple %s parameters",
+            BoundedWindow.class.getSimpleName());
         extraParameters.add(DoFnSignature.Parameter.BOUNDED_WINDOW);
       } else if (rawType.equals(DoFn.InputProvider.class)) {
         errors.checkArgument(
             !extraParameters.contains(DoFnSignature.Parameter.INPUT_PROVIDER),
-            "Multiple InputProvider parameters");
+            "Multiple %s parameters",
+            DoFn.InputProvider.class.getSimpleName());
         errors.checkArgument(
             paramT.equals(expectedInputProviderT),
-            "Wrong type of InputProvider parameter: %s, should be %s",
+            "Wrong type of %s parameter: %s, should be %s",
+            DoFn.InputProvider.class.getSimpleName(),
             formatType(paramT),
             formatType(expectedInputProviderT));
         extraParameters.add(DoFnSignature.Parameter.INPUT_PROVIDER);
       } else if (rawType.equals(DoFn.OutputReceiver.class)) {
         errors.checkArgument(
             !extraParameters.contains(DoFnSignature.Parameter.OUTPUT_RECEIVER),
-            "Multiple OutputReceiver parameters");
+            "Multiple %s parameters",
+            DoFn.OutputReceiver.class.getSimpleName());
         errors.checkArgument(
             paramT.equals(expectedOutputReceiverT),
-            "Wrong type of OutputReceiver parameter: %s, should be %s",
+            "Wrong type of %s parameter: %s, should be %s",
+            DoFn.OutputReceiver.class.getSimpleName(),
             formatType(paramT),
             formatType(expectedOutputReceiverT));
         extraParameters.add(DoFnSignature.Parameter.OUTPUT_RECEIVER);
+      } else if (RestrictionTracker.class.isAssignableFrom(rawType)) {
+        errors.checkArgument(
+            !extraParameters.contains(DoFnSignature.Parameter.RESTRICTION_TRACKER),
+            "Multiple %s parameters",
+            RestrictionTracker.class.getSimpleName());
+        extraParameters.add(DoFnSignature.Parameter.RESTRICTION_TRACKER);
+        trackerT = paramT;
       } else {
         List<String> allowedParamTypes =
-            Arrays.asList(formatType(new TypeToken<BoundedWindow>() {}));
+            Arrays.asList(
+                formatType(new TypeToken<BoundedWindow>() {}),
+                formatType(new TypeToken<RestrictionTracker<?>>() {}));
         errors.throwIllegalArgument(
             "%s is not a valid context parameter. Should be one of %s",
             formatType(paramT), allowedParamTypes);
       }
     }
 
-    return DoFnSignature.ProcessElementMethod.create(m, extraParameters);
+    // A splittable DoFn can not have any other extra context parameters.
+    if (extraParameters.contains(DoFnSignature.Parameter.RESTRICTION_TRACKER)) {
+      errors.checkArgument(
+          extraParameters.size() == 1,
+          "Splittable DoFn must not have any extra context arguments apart from %s, but has: %s",
+          trackerT,
+          extraParameters);
+    }
+
+    return DoFnSignature.ProcessElementMethod.create(
+        m, extraParameters, trackerT, DoFn.ProcessContinuation.class.equals(m.getReturnType()));
   }
 
   @VisibleForTesting
@@ -248,6 +496,100 @@ public class DoFnSignatures {
     return DoFnSignature.LifecycleMethod.create(m);
   }
 
+  @VisibleForTesting
+  static DoFnSignature.GetInitialRestrictionMethod analyzeGetInitialRestrictionMethod(
+      ErrorReporter errors, TypeToken<? extends DoFn> fnToken, Method m, TypeToken<?> inputT) {
+    // Method is of the form:
+    // @GetInitialRestriction
+    // RestrictionT getInitialRestriction(InputT element);
+    Type[] params = m.getGenericParameterTypes();
+    errors.checkArgument(
+        params.length == 1 && fnToken.resolveType(params[0]).equals(inputT),
+        "Must take a single argument of type %s",
+        formatType(inputT));
+    return DoFnSignature.GetInitialRestrictionMethod.create(
+        m, fnToken.resolveType(m.getGenericReturnType()));
+  }
+
+  /** Generates a type token for {@code List<T>} given {@code T}. */
+  private static <T> TypeToken<List<T>> listTypeOf(TypeToken<T> elementT) {
+    return new TypeToken<List<T>>() {}.where(new TypeParameter<T>() {}, elementT);
+  }
+
+  @VisibleForTesting
+  static DoFnSignature.SplitRestrictionMethod analyzeSplitRestrictionMethod(
+      ErrorReporter errors, TypeToken<? extends DoFn> fnToken, Method m, TypeToken<?> inputT) {
+    // Method is of the form:
+    // @SplitRestriction
+    // void splitRestriction(InputT element, RestrictionT restriction);
+    errors.checkArgument(void.class.equals(m.getReturnType()), "Must return void");
+
+    Type[] params = m.getGenericParameterTypes();
+    errors.checkArgument(params.length == 3, "Must have exactly 3 arguments");
+    errors.checkArgument(
+        fnToken.resolveType(params[0]).equals(inputT),
+        "First argument must be the element type %s",
+        formatType(inputT));
+
+    TypeToken<?> restrictionT = fnToken.resolveType(params[1]);
+    TypeToken<?> receiverT = fnToken.resolveType(params[2]);
+    TypeToken<?> expectedReceiverT = outputReceiverTypeOf(restrictionT);
+    errors.checkArgument(
+        receiverT.equals(expectedReceiverT),
+        "Third argument must be %s, but is %s",
+        formatType(expectedReceiverT),
+        formatType(receiverT));
+
+    return DoFnSignature.SplitRestrictionMethod.create(m, restrictionT);
+  }
+
+  /** Generates a type token for {@code Coder<T>} given {@code T}. */
+  private static <T> TypeToken<Coder<T>> coderTypeOf(TypeToken<T> elementT) {
+    return new TypeToken<Coder<T>>() {}.where(new TypeParameter<T>() {}, elementT);
+  }
+
+  @VisibleForTesting
+  static DoFnSignature.GetRestrictionCoderMethod analyzeGetRestrictionCoderMethod(
+      ErrorReporter errors, TypeToken<? extends DoFn> fnToken, Method m) {
+    errors.checkArgument(m.getParameterTypes().length == 0, "Must have zero arguments");
+    TypeToken<?> resT = fnToken.resolveType(m.getGenericReturnType());
+    errors.checkArgument(
+        resT.isSubtypeOf(TypeToken.of(Coder.class)),
+        "Must return a Coder, but returns %s",
+        formatType(resT));
+    return DoFnSignature.GetRestrictionCoderMethod.create(m, resT);
+  }
+
+  /**
+   * Generates a type token for {@code RestrictionTracker<RestrictionT>} given {@code RestrictionT}.
+   */
+  private static <RestrictionT>
+      TypeToken<RestrictionTracker<RestrictionT>> restrictionTrackerTypeOf(
+          TypeToken<RestrictionT> restrictionT) {
+    return new TypeToken<RestrictionTracker<RestrictionT>>() {}.where(
+        new TypeParameter<RestrictionT>() {}, restrictionT);
+  }
+
+  @VisibleForTesting
+  static DoFnSignature.NewTrackerMethod analyzeNewTrackerMethod(
+      ErrorReporter errors, TypeToken<? extends DoFn> fnToken, Method m) {
+    // Method is of the form:
+    // @NewTracker
+    // TrackerT newTracker(RestrictionT restriction);
+    Type[] params = m.getGenericParameterTypes();
+    errors.checkArgument(params.length == 1, "Must have a single argument");
+
+    TypeToken<?> restrictionT = fnToken.resolveType(params[0]);
+    TypeToken<?> trackerT = fnToken.resolveType(m.getGenericReturnType());
+    TypeToken<?> expectedTrackerT = restrictionTrackerTypeOf(restrictionT);
+    errors.checkArgument(
+        trackerT.isSubtypeOf(expectedTrackerT),
+        "Returns %s, but must return a subtype of %s",
+        formatType(trackerT),
+        formatType(expectedTrackerT));
+    return DoFnSignature.NewTrackerMethod.create(m, restrictionT, trackerT);
+  }
+
   private static Collection<Method> declaredMethodsWithAnnotation(
       Class<? extends Annotation> anno, Class<?> startClass, Class<?> stopClass) {
     Collection<Method> matches = new ArrayList<>();
@@ -310,7 +652,7 @@ public class DoFnSignatures {
   }
 
   private static String format(Method method) {
-    return ReflectHelpers.CLASS_AND_METHOD_FORMATTER.apply(method);
+    return ReflectHelpers.METHOD_FORMATTER.apply(method);
   }
 
   private static String formatType(TypeToken<?> t) {
@@ -327,7 +669,9 @@ public class DoFnSignatures {
     ErrorReporter forMethod(Class<? extends Annotation> annotation, Method method) {
       return new ErrorReporter(
           this,
-          String.format("@%s %s", annotation, (method == null) ? "(absent)" : format(method)));
+          String.format(
+              "@%s %s",
+              annotation.getSimpleName(), (method == null) ? "(absent)" : format(method)));
     }
 
     void throwIllegalArgument(String message, Object... args) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
new file mode 100644
index 0000000..6b249ee
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * Manages concurrent access to the restriction and keeps track of its claimed part for a <a
+ * href="https://s.apache.org/splittable-do-fn>splittable</a> {@link DoFn}.
+ */
+public interface RestrictionTracker<RestrictionT> {
+  /**
+   * Returns a restriction accurately describing the full range of work the current {@link
+   * DoFn.ProcessElement} call will do, including already completed work.
+   */
+  RestrictionT currentRestriction();
+
+  /**
+   * Signals that the current {@link DoFn.ProcessElement} call should terminate as soon as possible.
+   * Modifies {@link #currentRestriction}. Returns a restriction representing the rest of the work:
+   * the old value of {@link #currentRestriction} is equivalent to the new value and the return
+   * value of this method combined.
+   */
+  RestrictionT checkpoint();
+
+  // TODO: Add the more general splitRemainderAfterFraction() and other methods.
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/package-info.java
new file mode 100644
index 0000000..1ceb880
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines utilities related to <a href="https://s.apache.org/splittable-do-fn>splittable</a>
+ * {@link org.apache.beam.sdk.transforms.DoFn}'s.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
index f0f7d22..436e227 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
@@ -17,11 +17,9 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.KV;
@@ -31,40 +29,55 @@ import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/**
- * Test case for {@link KvCoder}.
- */
+/** Test case for {@link KvCoder}. */
 @RunWith(JUnit4.class)
 public class KvCoderTest {
+  private static class CoderAndData<T> {
+    Coder<T> coder;
+    List<T> data;
+  }
+
+  private static class AnyCoderAndData {
+    private CoderAndData<?> coderAndData;
+  }
 
-  private static final Map<Coder<?>, Iterable<?>> TEST_DATA =
-      new ImmutableMap.Builder<Coder<?>, Iterable<?>>()
-      .put(VarIntCoder.of(),
-          Arrays.asList(-1, 0, 1, 13, Integer.MAX_VALUE, Integer.MIN_VALUE))
-      .put(BigEndianLongCoder.of(),
-          Arrays.asList(-1L, 0L, 1L, 13L, Long.MAX_VALUE, Long.MIN_VALUE))
-      .put(StringUtf8Coder.of(),
-          Arrays.asList("", "hello", "goodbye", "1"))
-      .put(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
-          Arrays.asList(KV.of("", -1), KV.of("hello", 0), KV.of("goodbye", Integer.MAX_VALUE)))
-      .put(ListCoder.of(VarLongCoder.of()),
-          Arrays.asList(
-              Arrays.asList(1L, 2L, 3L),
-              Collections.emptyList()))
-       .build();
+  private static <T> AnyCoderAndData coderAndData(Coder<T> coder, List<T> data) {
+    CoderAndData<T> coderAndData = new CoderAndData<>();
+    coderAndData.coder = coder;
+    coderAndData.data = data;
+    AnyCoderAndData res = new AnyCoderAndData();
+    res.coderAndData = coderAndData;
+    return res;
+  }
+
+  private static final List<AnyCoderAndData> TEST_DATA =
+      Arrays.asList(
+          coderAndData(
+              VarIntCoder.of(), Arrays.asList(-1, 0, 1, 13, Integer.MAX_VALUE, Integer.MIN_VALUE)),
+          coderAndData(
+              BigEndianLongCoder.of(),
+              Arrays.asList(-1L, 0L, 1L, 13L, Long.MAX_VALUE, Long.MIN_VALUE)),
+          coderAndData(StringUtf8Coder.of(), Arrays.asList("", "hello", "goodbye", "1")),
+          coderAndData(
+              KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
+              Arrays.asList(KV.of("", -1), KV.of("hello", 0), KV.of("goodbye", Integer.MAX_VALUE))),
+          coderAndData(
+              ListCoder.of(VarLongCoder.of()),
+              Arrays.asList(Arrays.asList(1L, 2L, 3L), Collections.<Long>emptyList())));
 
   @Test
+  @SuppressWarnings("rawtypes")
   public void testDecodeEncodeEqual() throws Exception {
-    for (Map.Entry<Coder<?>, Iterable<?>> entry : TEST_DATA.entrySet()) {
-      // The coder and corresponding values must be the same type.
-      // If someone messes this up in the above test data, the test
-      // will fail anyhow (unless the coder magically works on data
-      // it does not understand).
-      @SuppressWarnings("unchecked")
-      Coder<Object> coder = (Coder<Object>) entry.getKey();
-      Iterable<?> values = entry.getValue();
-      for (Object value : values) {
-        CoderProperties.coderDecodeEncodeEqual(coder, value);
+    for (AnyCoderAndData keyCoderAndData : TEST_DATA) {
+      Coder keyCoder = keyCoderAndData.coderAndData.coder;
+      for (Object key : keyCoderAndData.coderAndData.data) {
+        for (AnyCoderAndData valueCoderAndData : TEST_DATA) {
+          Coder valueCoder = valueCoderAndData.coderAndData.coder;
+          for (Object value : valueCoderAndData.coderAndData.data) {
+            CoderProperties.coderDecodeEncodeEqual(
+                KvCoder.of(keyCoder, valueCoder), KV.of(key, value));
+          }
+        }
       }
     }
   }
@@ -75,37 +88,29 @@ public class KvCoderTest {
   @Test
   public void testEncodingId() throws Exception {
     CoderProperties.coderHasEncodingId(
-        KvCoder.of(VarIntCoder.of(), VarIntCoder.of()),
-        EXPECTED_ENCODING_ID);
+        KvCoder.of(VarIntCoder.of(), VarIntCoder.of()), EXPECTED_ENCODING_ID);
   }
 
-  /**
-   * Homogeneously typed test value for ease of use with the wire format test utility.
-   */
+  /** Homogeneously typed test value for ease of use with the wire format test utility. */
   private static final Coder<KV<String, Integer>> TEST_CODER =
       KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
 
-  private static final List<KV<String, Integer>> TEST_VALUES = Arrays.asList(
-      KV.of("", -1),
-      KV.of("hello", 0),
-      KV.of("goodbye", Integer.MAX_VALUE));
+  private static final List<KV<String, Integer>> TEST_VALUES =
+      Arrays.asList(KV.of("", -1), KV.of("hello", 0), KV.of("goodbye", Integer.MAX_VALUE));
 
   /**
-   * Generated data to check that the wire format has not changed. To regenerate, see
-   * {@link org.apache.beam.sdk.coders.PrintBase64Encodings}.
+   * Generated data to check that the wire format has not changed. To regenerate, see {@link
+   * org.apache.beam.sdk.coders.PrintBase64Encodings}.
    */
-  private static final List<String> TEST_ENCODINGS = Arrays.asList(
-      "AP____8P",
-      "BWhlbGxvAA",
-      "B2dvb2RieWX_____Bw");
+  private static final List<String> TEST_ENCODINGS =
+      Arrays.asList("AP____8P", "BWhlbGxvAA", "B2dvb2RieWX_____Bw");
 
   @Test
   public void testWireFormatEncode() throws Exception {
     CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS);
   }
 
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
+  @Rule public ExpectedException thrown = ExpectedException.none();
 
   @Test
   public void encodeNullThrowsCoderException() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 7ce98bc..9c7b991 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -56,6 +56,7 @@ import org.apache.beam.sdk.transforms.ParDo.Bound;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
@@ -1469,4 +1470,49 @@ public class ParDoTest implements Serializable {
     assertThat(displayData, includesDisplayDataFrom(fn));
     assertThat(displayData, hasDisplayItem("fn", fn.getClass()));
   }
+
+  private abstract static class SomeTracker implements RestrictionTracker<Object> {}
+  private static class TestSplittableDoFn extends DoFn<Integer, String> {
+    @ProcessElement
+    public void processElement(ProcessContext context, SomeTracker tracker) {}
+
+    @GetInitialRestriction
+    public Object getInitialRestriction(Integer element) {
+      return null;
+    }
+
+    @NewTracker
+    public SomeTracker newTracker(Object restriction) {
+      return null;
+    }
+  }
+
+  @Test
+  public void testRejectsSplittableDoFnByDefault() {
+    // ParDo with a splittable DoFn must be overridden by the runner.
+    // Without an override, applying it directly must fail.
+    Pipeline p = TestPipeline.create();
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Splittable DoFn not supported by the current runner");
+
+    p.apply(Create.of(1, 2, 3)).apply(ParDo.of(new TestSplittableDoFn()));
+  }
+
+  @Test
+  public void testMultiRejectsSplittableDoFnByDefault() {
+    // ParDo with a splittable DoFn must be overridden by the runner.
+    // Without an override, applying it directly must fail.
+    Pipeline p = TestPipeline.create();
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Splittable DoFn not supported by the current runner");
+
+    p.apply(Create.of(1, 2, 3))
+        .apply(
+            ParDo.of(new TestSplittableDoFn())
+                .withOutputTags(
+                    new TupleTag<String>("main") {},
+                    TupleTagList.of(new TupleTag<String>("side1") {})));
+  }
 }


[4/4] incubator-beam git commit: Closes #896

Posted by bc...@apache.org.
Closes #896


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/13b45895
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/13b45895
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/13b45895

Branch: refs/heads/master
Commit: 13b45895eb13cd48557472463404bd097d7097d7
Parents: a5d1293 a0a2488
Author: bchambers <bc...@google.com>
Authored: Wed Oct 12 17:29:21 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Oct 12 17:29:21 2016 -0700

----------------------------------------------------------------------
 runners/core-java/pom.xml                       |   6 +
 .../runners/core/ElementAndRestriction.java     |  42 ++
 .../core/ElementAndRestrictionCoder.java        |  67 +++
 .../runners/core/GBKIntoKeyedWorkItems.java     |  40 ++
 .../beam/runners/core/SplittableParDo.java      | 469 ++++++++++++++++
 .../core/ElementAndRestrictionCoderTest.java    | 127 +++++
 .../beam/runners/core/SplittableParDoTest.java  | 467 ++++++++++++++++
 ...ectGBKIntoKeyedWorkItemsOverrideFactory.java |  66 +++
 .../beam/runners/direct/DirectRunner.java       |   5 +
 .../runners/direct/ParDoOverrideFactory.java    |  55 ++
 .../beam/runners/direct/SplittableDoFnTest.java | 225 ++++++++
 .../beam/sdk/annotations/Experimental.java      |   8 +-
 .../org/apache/beam/sdk/transforms/DoFn.java    | 218 +++++++-
 .../beam/sdk/transforms/DoFnAdapters.java       |  27 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  | 117 ++--
 .../org/apache/beam/sdk/transforms/ParDo.java   |  19 +
 .../sdk/transforms/reflect/DoFnInvoker.java     |  26 +-
 .../sdk/transforms/reflect/DoFnInvokers.java    | 179 +++++-
 .../sdk/transforms/reflect/DoFnSignature.java   | 114 +++-
 .../sdk/transforms/reflect/DoFnSignatures.java  | 366 ++++++++++++-
 .../splittabledofn/RestrictionTracker.java      |  42 ++
 .../transforms/splittabledofn/package-info.java |  22 +
 .../org/apache/beam/sdk/coders/KvCoderTest.java |  99 ++--
 .../apache/beam/sdk/transforms/ParDoTest.java   |  46 ++
 .../transforms/reflect/DoFnInvokersTest.java    | 261 ++++++++-
 .../DoFnSignaturesProcessElementTest.java       |   4 +-
 .../DoFnSignaturesSplittableDoFnTest.java       | 543 +++++++++++++++++++
 27 files changed, 3495 insertions(+), 165 deletions(-)
----------------------------------------------------------------------



[3/4] incubator-beam git commit: [BEAM-65] SplittableDoFn prototype.

Posted by bc...@apache.org.
[BEAM-65] SplittableDoFn prototype.

Work in progress. Currently only runs in direct runner,
and not ready for any use by real users.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a0a24883
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a0a24883
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a0a24883

Branch: refs/heads/master
Commit: a0a24883737850052f54290255780e868c0b63dc
Parents: a5d1293
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 11 17:13:53 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Oct 12 17:29:20 2016 -0700

----------------------------------------------------------------------
 runners/core-java/pom.xml                       |   6 +
 .../runners/core/ElementAndRestriction.java     |  42 ++
 .../core/ElementAndRestrictionCoder.java        |  67 +++
 .../runners/core/GBKIntoKeyedWorkItems.java     |  40 ++
 .../beam/runners/core/SplittableParDo.java      | 469 ++++++++++++++++
 .../core/ElementAndRestrictionCoderTest.java    | 127 +++++
 .../beam/runners/core/SplittableParDoTest.java  | 467 ++++++++++++++++
 ...ectGBKIntoKeyedWorkItemsOverrideFactory.java |  66 +++
 .../beam/runners/direct/DirectRunner.java       |   5 +
 .../runners/direct/ParDoOverrideFactory.java    |  55 ++
 .../beam/runners/direct/SplittableDoFnTest.java | 225 ++++++++
 .../beam/sdk/annotations/Experimental.java      |   8 +-
 .../org/apache/beam/sdk/transforms/DoFn.java    | 218 +++++++-
 .../beam/sdk/transforms/DoFnAdapters.java       |  27 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  | 117 ++--
 .../org/apache/beam/sdk/transforms/ParDo.java   |  19 +
 .../sdk/transforms/reflect/DoFnInvoker.java     |  26 +-
 .../sdk/transforms/reflect/DoFnInvokers.java    | 179 +++++-
 .../sdk/transforms/reflect/DoFnSignature.java   | 114 +++-
 .../sdk/transforms/reflect/DoFnSignatures.java  | 366 ++++++++++++-
 .../splittabledofn/RestrictionTracker.java      |  42 ++
 .../transforms/splittabledofn/package-info.java |  22 +
 .../org/apache/beam/sdk/coders/KvCoderTest.java |  99 ++--
 .../apache/beam/sdk/transforms/ParDoTest.java   |  46 ++
 .../transforms/reflect/DoFnInvokersTest.java    | 261 ++++++++-
 .../DoFnSignaturesProcessElementTest.java       |   4 +-
 .../DoFnSignaturesSplittableDoFnTest.java       | 543 +++++++++++++++++++
 27 files changed, 3495 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index d958dd2..d84c420 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -190,6 +190,12 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>annotations</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestriction.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestriction.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestriction.java
new file mode 100644
index 0000000..4a5d0c4
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestriction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * A tuple of an element and a restriction applied to processing it with a
+ * <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
+ */
+@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+@AutoValue
+public abstract class ElementAndRestriction<ElementT, RestrictionT> {
+  /** The element to process. */
+  public abstract ElementT element();
+
+  /** The restriction applied to processing the element. */
+  public abstract RestrictionT restriction();
+
+  /** Constructs the {@link ElementAndRestriction}. */
+  public static <InputT, RestrictionT> ElementAndRestriction<InputT, RestrictionT> of(
+      InputT element, RestrictionT restriction) {
+    return new AutoValue_ElementAndRestriction<>(element, restriction);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
new file mode 100644
index 0000000..6dec8e2
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+
+/** A {@link Coder} for {@link ElementAndRestriction}. */
+@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+public class ElementAndRestrictionCoder<ElementT, RestrictionT>
+    extends CustomCoder<ElementAndRestriction<ElementT, RestrictionT>> {
+  private final Coder<ElementT> elementCoder;
+  private final Coder<RestrictionT> restrictionCoder;
+
+  /**
+   * Creates an {@link ElementAndRestrictionCoder} from an element coder and a restriction coder.
+   */
+  public static <ElementT, RestrictionT> ElementAndRestrictionCoder<ElementT, RestrictionT> of(
+      Coder<ElementT> elementCoder, Coder<RestrictionT> restrictionCoder) {
+    return new ElementAndRestrictionCoder<>(elementCoder, restrictionCoder);
+  }
+
+  private ElementAndRestrictionCoder(
+      Coder<ElementT> elementCoder, Coder<RestrictionT> restrictionCoder) {
+    this.elementCoder = elementCoder;
+    this.restrictionCoder = restrictionCoder;
+  }
+
+  @Override
+  public void encode(
+      ElementAndRestriction<ElementT, RestrictionT> value, OutputStream outStream, Context context)
+      throws IOException {
+    if (value == null) {
+      throw new CoderException("cannot encode a null ElementAndRestriction");
+    }
+    elementCoder.encode(value.element(), outStream, context.nested());
+    restrictionCoder.encode(value.restriction(), outStream, context);
+  }
+
+  @Override
+  public ElementAndRestriction<ElementT, RestrictionT> decode(InputStream inStream, Context context)
+      throws IOException {
+    ElementT key = elementCoder.decode(inStream, context.nested());
+    RestrictionT value = restrictionCoder.decode(inStream, context);
+    return ElementAndRestriction.of(key, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
new file mode 100644
index 0000000..ca4d681
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Interface for creating a runner-specific {@link GroupByKey GroupByKey-like} {@link PTransform}
+ * that produces {@link KeyedWorkItem KeyedWorkItems} so that downstream transforms can access state
+ * and timers.
+ */
+@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+public class GBKIntoKeyedWorkItems<KeyT, InputT>
+    extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
+  @Override
+  public PCollection<KeyedWorkItem<KeyT, InputT>> apply(PCollection<KV<KeyT, InputT>> input) {
+    return PCollection.createPrimitiveOutputInternal(
+        input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
new file mode 100644
index 0000000..7645149
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+import java.util.UUID;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * A utility transform that executes a <a
+ * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn} by expanding it into a
+ * network of simpler transforms:
+ *
+ * <ol>
+ * <li>Pair each element with an initial restriction
+ * <li>Split each restriction into sub-restrictions
+ * <li>Assign a unique key to each element/restriction pair
+ * <li>Group by key (so that work is partitioned by key and we can access state/timers)
+ * <li>Process each keyed element/restriction pair with the splittable {@link DoFn}'s {@link
+ *     DoFn.ProcessElement} method, using state and timers API.
+ * </ol>
+ *
+ * <p>This transform is intended as a helper for internal use by runners when implementing {@code
+ * ParDo.of(splittable DoFn)}, but not for direct use by pipeline writers.
+ */
+@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+public class SplittableParDo<
+        InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
+    extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
+  private final DoFn<InputT, OutputT> fn;
+  private final DoFnSignature signature;
+
+  /**
+   * Creates the transform for the given original {@link ParDo} and {@link DoFn}.
+   *
+   * @param fn The splittable {@link DoFn} inside the original {@link ParDo} transform.
+   */
+  public SplittableParDo(DoFn<InputT, OutputT> fn) {
+    checkNotNull(fn, "fn must not be null");
+    this.fn = fn;
+    this.signature = DoFnSignatures.INSTANCE.getOrParseSignature(fn.getClass());
+    checkArgument(signature.processElement().isSplittable(), "fn must be a splittable DoFn");
+  }
+
+  @Override
+  public PCollection<OutputT> apply(PCollection<InputT> input) {
+    PCollection.IsBounded isFnBounded = signature.isBoundedPerElement();
+    Coder<RestrictionT> restrictionCoder =
+        DoFnInvokers.INSTANCE
+            .newByteBuddyInvoker(fn)
+            .invokeGetRestrictionCoder(input.getPipeline().getCoderRegistry());
+    Coder<ElementAndRestriction<InputT, RestrictionT>> splitCoder =
+        ElementAndRestrictionCoder.of(input.getCoder(), restrictionCoder);
+
+    PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> keyedWorkItems =
+        input
+            .apply(
+                "Pair with initial restriction",
+                ParDo.of(new PairWithRestrictionFn<InputT, OutputT, RestrictionT>(fn)))
+            .setCoder(splitCoder)
+            .apply("Split restriction", ParDo.of(new SplitRestrictionFn<InputT, RestrictionT>(fn)))
+            .setCoder(splitCoder)
+            .apply(
+                "Assign unique key",
+                WithKeys.of(new RandomUniqueKeyFn<ElementAndRestriction<InputT, RestrictionT>>()))
+            .apply(
+                "Group by key",
+                new GBKIntoKeyedWorkItems<String, ElementAndRestriction<InputT, RestrictionT>>());
+    checkArgument(
+        keyedWorkItems.getWindowingStrategy().getWindowFn() instanceof GlobalWindows,
+        "GBKIntoKeyedWorkItems must produce a globally windowed collection, "
+            + "but windowing strategy was: %s",
+        keyedWorkItems.getWindowingStrategy());
+    return keyedWorkItems
+        .apply(
+            "Process",
+            ParDo.of(
+                new ProcessFn<InputT, OutputT, RestrictionT, TrackerT>(
+                    fn,
+                    input.getCoder(),
+                    restrictionCoder,
+                    input.getWindowingStrategy().getWindowFn().windowCoder())))
+        .setIsBoundedInternal(input.isBounded().and(isFnBounded))
+        .setWindowingStrategyInternal(input.getWindowingStrategy());
+  }
+
+  /**
+   * Assigns a random unique key to each element of the input collection, so that the output
+   * collection is effectively the same elements as input, but the per-key state and timers are now
+   * effectively per-element.
+   */
+  private static class RandomUniqueKeyFn<T> implements SerializableFunction<T, String> {
+    @Override
+    public String apply(T input) {
+      return UUID.randomUUID().toString();
+    }
+  }
+
+  /**
+   * Pairs each input element with its initial restriction using the given splittable {@link DoFn}.
+   */
+  private static class PairWithRestrictionFn<InputT, OutputT, RestrictionT>
+      extends DoFn<InputT, ElementAndRestriction<InputT, RestrictionT>> {
+    private DoFn<InputT, OutputT> fn;
+    private transient DoFnInvoker<InputT, OutputT> invoker;
+
+    PairWithRestrictionFn(DoFn<InputT, OutputT> fn) {
+      this.fn = fn;
+    }
+
+    @Setup
+    public void setup() {
+      invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      context.output(
+          ElementAndRestriction.of(
+              context.element(),
+              invoker.<RestrictionT>invokeGetInitialRestriction(context.element())));
+    }
+  }
+
+  /**
+   * The heart of splittable {@link DoFn} execution: processes a single (element, restriction) pair
+   * by creating a tracker for the restriction and checkpointing/resuming processing later if
+   * necessary.
+   *
+   * <p>TODO: This uses deprecated OldDoFn since DoFn does not provide access to state/timer
+   * internals. This should be rewritten to use the <a href="https://s.apache.org/beam-state">State
+   * and Timers API</a> once it is available.
+   */
+  @VisibleForTesting
+  static class ProcessFn<
+          InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
+      extends OldDoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> {
+    // Commit at least once every 10k output records.  This keeps the watermark advancing
+    // smoothly, and ensures that not too much work will have to be reprocessed in the event of
+    // a crash.
+    // TODO: Also commit at least once every N seconds (runner-specific parameter).
+    @VisibleForTesting static final int MAX_OUTPUTS_PER_BUNDLE = 10000;
+
+    /**
+     * The state cell containing a watermark hold for the output of this {@link DoFn}. The hold is
+     * acquired during the first {@link DoFn.ProcessElement} call for each element and restriction,
+     * and is released when the {@link DoFn.ProcessElement} call returns {@link
+     * DoFn.ProcessContinuation#stop}.
+     *
+     * <p>A hold is needed to avoid letting the output watermark immediately progress together with
+     * the input watermark when the first {@link DoFn.ProcessElement} call for this element
+     * completes.
+     *
+     * <p>The hold is updated with the future output watermark reported by ProcessContinuation.
+     */
+    private static final StateTag<Object, WatermarkHoldState<GlobalWindow>> watermarkHoldTag =
+        StateTags.makeSystemTagInternal(
+            StateTags.<GlobalWindow>watermarkStateInternal(
+                "hold", OutputTimeFns.outputAtLatestInputTimestamp()));
+
+    /**
+     * The state cell containing a copy of the element. Written during the first {@link
+     * DoFn.ProcessElement} call and read during subsequent calls in response to timer firings, when
+     * the original element is no longer available.
+     */
+    private final StateTag<Object, ValueState<WindowedValue<InputT>>> elementTag;
+
+    /**
+     * The state cell containing a restriction representing the unprocessed part of work for this
+     * element.
+     */
+    private StateTag<Object, ValueState<RestrictionT>> restrictionTag;
+
+    private final DoFn<InputT, OutputT> fn;
+    private final Coder<? extends BoundedWindow> windowCoder;
+
+    private transient DoFnInvoker<InputT, OutputT> invoker;
+
+    ProcessFn(
+        DoFn<InputT, OutputT> fn,
+        Coder<InputT> elementCoder,
+        Coder<RestrictionT> restrictionCoder,
+        Coder<? extends BoundedWindow> windowCoder) {
+      this.fn = fn;
+      this.windowCoder = windowCoder;
+      elementTag =
+          StateTags.value("element", WindowedValue.getFullCoder(elementCoder, this.windowCoder));
+      DoFnInvoker<InputT, OutputT> invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+      restrictionTag = StateTags.value("restriction", restrictionCoder);
+    }
+
+    @Override
+    public void setup() throws Exception {
+      invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+    }
+
+    @Override
+    public void processElement(final ProcessContext c) {
+      // Initialize state (element and restriction) depending on whether this is the seed call.
+      // The seed call is the first call for this element, which actually has the element.
+      // Subsequent calls are timer firings and the element has to be retrieved from the state.
+      TimerInternals.TimerData timer = Iterables.getOnlyElement(c.element().timersIterable(), null);
+      boolean isSeedCall = (timer == null);
+      StateNamespace stateNamespace = isSeedCall ? StateNamespaces.global() : timer.getNamespace();
+      ValueState<WindowedValue<InputT>> elementState =
+          c.windowingInternals().stateInternals().state(stateNamespace, elementTag);
+      ValueState<RestrictionT> restrictionState =
+          c.windowingInternals().stateInternals().state(stateNamespace, restrictionTag);
+      WatermarkHoldState<GlobalWindow> holdState =
+          c.windowingInternals().stateInternals().state(stateNamespace, watermarkHoldTag);
+
+      ElementAndRestriction<WindowedValue<InputT>, RestrictionT> elementAndRestriction;
+      if (isSeedCall) {
+        // The element and restriction are available in c.element().
+        WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue =
+            Iterables.getOnlyElement(c.element().elementsIterable());
+        WindowedValue<InputT> element = windowedValue.withValue(windowedValue.getValue().element());
+        elementState.write(element);
+        elementAndRestriction =
+            ElementAndRestriction.of(element, windowedValue.getValue().restriction());
+      } else {
+        // This is not the first ProcessElement call for this element/restriction - rather,
+        // this is a timer firing, so we need to fetch the element and restriction from state.
+        elementState.readLater();
+        restrictionState.readLater();
+        elementAndRestriction =
+            ElementAndRestriction.of(elementState.read(), restrictionState.read());
+      }
+
+      final TrackerT tracker = invoker.invokeNewTracker(elementAndRestriction.restriction());
+      @SuppressWarnings("unchecked")
+      final RestrictionT[] residual = (RestrictionT[]) new Object[1];
+      // TODO: Only let the call run for a limited amount of time, rather than simply
+      // producing a limited amount of output.
+      DoFn.ProcessContinuation cont =
+          invoker.invokeProcessElement(
+              makeContext(c, elementAndRestriction.element(), tracker, residual),
+              wrapTracker(tracker));
+      if (residual[0] == null) {
+        // This means the call completed unsolicited, and the context produced by makeContext()
+        // did not take a checkpoint. Take one now.
+        residual[0] = checkNotNull(tracker.checkpoint());
+      }
+
+      // Save state for resuming.
+      if (!cont.shouldResume()) {
+        // All work for this element/restriction is completed. Clear state and release hold.
+        elementState.clear();
+        restrictionState.clear();
+        holdState.clear();
+        return;
+      }
+      restrictionState.write(residual[0]);
+      Instant futureOutputWatermark = cont.getWatermark();
+      if (futureOutputWatermark != null) {
+        holdState.add(futureOutputWatermark);
+      }
+      // Set a timer to continue processing this element.
+      TimerInternals timerInternals = c.windowingInternals().timerInternals();
+      timerInternals.setTimer(
+          TimerInternals.TimerData.of(
+              stateNamespace,
+              timerInternals.currentProcessingTime().plus(cont.resumeDelay()),
+              TimeDomain.PROCESSING_TIME));
+    }
+
+    private DoFn<InputT, OutputT>.ProcessContext makeContext(
+        final ProcessContext baseContext,
+        final WindowedValue<InputT> element,
+        final TrackerT tracker,
+        final RestrictionT[] residualRestrictionHolder) {
+      return fn.new ProcessContext() {
+        private int numOutputs = 0;
+
+        public InputT element() {
+          return element.getValue();
+        }
+
+        public Instant timestamp() {
+          return element.getTimestamp();
+        }
+
+        public PaneInfo pane() {
+          return element.getPane();
+        }
+
+        public void output(OutputT output) {
+          baseContext
+              .windowingInternals()
+              .outputWindowedValue(
+                  output, element.getTimestamp(), element.getWindows(), element.getPane());
+          noteOutput();
+        }
+
+        public void outputWithTimestamp(OutputT output, Instant timestamp) {
+          baseContext
+              .windowingInternals()
+              .outputWindowedValue(output, timestamp, element.getWindows(), element.getPane());
+          noteOutput();
+        }
+
+        private void noteOutput() {
+          if (++numOutputs >= MAX_OUTPUTS_PER_BUNDLE) {
+            // Request a checkpoint. The fn *may* produce more output, but hopefully not too much.
+            residualRestrictionHolder[0] = tracker.checkpoint();
+          }
+        }
+
+        public <T> T sideInput(PCollectionView<T> view) {
+          return baseContext.sideInput(view);
+        }
+
+        public PipelineOptions getPipelineOptions() {
+          return baseContext.getPipelineOptions();
+        }
+
+        public <T> void sideOutput(TupleTag<T> tag, T output) {
+          // TODO: I'm not sure how to implement this correctly: there's no
+          // "internals.sideOutputWindowedValue".
+          throw new UnsupportedOperationException(
+              "Side outputs not yet supported by splittable DoFn");
+        }
+
+        public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+          // TODO: I'm not sure how to implement this correctly: there's no
+          // "internals.sideOutputWindowedValue".
+          throw new UnsupportedOperationException(
+              "Side outputs not yet supported by splittable DoFn");
+        }
+
+        @Override
+        protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+            String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
+          return fn.createAggregator(name, combiner);
+        }
+      };
+    }
+
+    /** Creates an {@link DoFn.ExtraContextFactory} that provides just the given tracker. */
+    private DoFn.ExtraContextFactory<InputT, OutputT> wrapTracker(final TrackerT tracker) {
+      return new ExtraContextFactoryForTracker<>(tracker);
+    }
+
+    private static class ExtraContextFactoryForTracker<
+            InputT, OutputT, TrackerT extends RestrictionTracker<?>>
+        implements DoFn.ExtraContextFactory<InputT, OutputT> {
+      private final TrackerT tracker;
+
+      ExtraContextFactoryForTracker(TrackerT tracker) {
+        this.tracker = tracker;
+      }
+
+      @Override
+      public BoundedWindow window() {
+        // DoFnSignatures should have verified that this DoFn doesn't access extra context.
+        throw new IllegalStateException("Unexpected extra context access on a splittable DoFn");
+      }
+
+      @Override
+      public DoFn.InputProvider<InputT> inputProvider() {
+        // DoFnSignatures should have verified that this DoFn doesn't access extra context.
+        throw new IllegalStateException("Unexpected extra context access on a splittable DoFn");
+      }
+
+      @Override
+      public DoFn.OutputReceiver<OutputT> outputReceiver() {
+        // DoFnSignatures should have verified that this DoFn doesn't access extra context.
+        throw new IllegalStateException("Unexpected extra context access on a splittable DoFn");
+      }
+
+      @Override
+      public WindowingInternals<InputT, OutputT> windowingInternals() {
+        // DoFnSignatures should have verified that this DoFn doesn't access extra context.
+        throw new IllegalStateException("Unexpected extra context access on a splittable DoFn");
+      }
+
+      @Override
+      public TrackerT restrictionTracker() {
+        return tracker;
+      }
+    }
+  }
+
+  /** Splits the restriction using the given {@link DoFn.SplitRestriction} method. */
+  private static class SplitRestrictionFn<InputT, RestrictionT>
+      extends DoFn<
+          ElementAndRestriction<InputT, RestrictionT>,
+          ElementAndRestriction<InputT, RestrictionT>> {
+    private final DoFn<InputT, ?> splittableFn;
+    private transient DoFnInvoker<InputT, ?> invoker;
+
+    SplitRestrictionFn(DoFn<InputT, ?> splittableFn) {
+      this.splittableFn = splittableFn;
+    }
+
+    @Setup
+    public void setup() {
+      invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(splittableFn);
+    }
+
+    @ProcessElement
+    public void processElement(final ProcessContext c) {
+      final InputT element = c.element().element();
+      invoker.invokeSplitRestriction(
+          element,
+          c.element().restriction(),
+          new OutputReceiver<RestrictionT>() {
+            @Override
+            public void output(RestrictionT part) {
+              c.output(ElementAndRestriction.of(element, part));
+            }
+          });
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/test/java/org/apache/beam/runners/core/ElementAndRestrictionCoderTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ElementAndRestrictionCoderTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ElementAndRestrictionCoderTest.java
new file mode 100644
index 0000000..f516046
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ElementAndRestrictionCoderTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+/**
+ * Tests for {@link ElementAndRestrictionCoder}. Parroted from {@link
+ * org.apache.beam.sdk.coders.KvCoderTest}.
+ */
+@RunWith(Parameterized.class)
+public class ElementAndRestrictionCoderTest<K, V> {
+  private static class CoderAndData<T> {
+    Coder<T> coder;
+    List<T> data;
+  }
+
+  private static class AnyCoderAndData {
+    private CoderAndData<?> coderAndData;
+  }
+
+  private static <T> AnyCoderAndData coderAndData(Coder<T> coder, List<T> data) {
+    CoderAndData<T> coderAndData = new CoderAndData<>();
+    coderAndData.coder = coder;
+    coderAndData.data = data;
+    AnyCoderAndData res = new AnyCoderAndData();
+    res.coderAndData = coderAndData;
+    return res;
+  }
+
+  private static final List<AnyCoderAndData> TEST_DATA =
+      Arrays.asList(
+          coderAndData(
+              VarIntCoder.of(), Arrays.asList(-1, 0, 1, 13, Integer.MAX_VALUE, Integer.MIN_VALUE)),
+          coderAndData(
+              BigEndianLongCoder.of(),
+              Arrays.asList(-1L, 0L, 1L, 13L, Long.MAX_VALUE, Long.MIN_VALUE)),
+          coderAndData(StringUtf8Coder.of(), Arrays.asList("", "hello", "goodbye", "1")),
+          coderAndData(
+              ElementAndRestrictionCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
+              Arrays.asList(
+                  ElementAndRestriction.of("", -1),
+                  ElementAndRestriction.of("hello", 0),
+                  ElementAndRestriction.of("goodbye", Integer.MAX_VALUE))),
+          coderAndData(
+              ListCoder.of(VarLongCoder.of()),
+              Arrays.asList(Arrays.asList(1L, 2L, 3L), Collections.<Long>emptyList())));
+
+  @Parameterized.Parameters(name = "{index}: keyCoder={0} key={1} valueCoder={2} value={3}")
+  public static Collection<Object[]> data() {
+    List<Object[]> parameters = new ArrayList<>();
+    for (AnyCoderAndData keyCoderAndData : TEST_DATA) {
+      Coder keyCoder = keyCoderAndData.coderAndData.coder;
+      for (Object key : keyCoderAndData.coderAndData.data) {
+        for (AnyCoderAndData valueCoderAndData : TEST_DATA) {
+          Coder valueCoder = valueCoderAndData.coderAndData.coder;
+          for (Object value : valueCoderAndData.coderAndData.data) {
+            parameters.add(new Object[] {keyCoder, key, valueCoder, value});
+          }
+        }
+      }
+    }
+    return parameters;
+  }
+
+  @Parameter(0)
+  public Coder<K> keyCoder;
+  @Parameter(1)
+  public K key;
+  @Parameter(2)
+  public Coder<V> valueCoder;
+  @Parameter(3)
+  public V value;
+
+  @Test
+  @SuppressWarnings("rawtypes")
+  public void testDecodeEncodeEqual() throws Exception {
+    CoderProperties.coderDecodeEncodeEqual(
+        ElementAndRestrictionCoder.of(keyCoder, valueCoder),
+        ElementAndRestriction.of(key, value));
+  }
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void encodeNullThrowsCoderException() throws Exception {
+    thrown.expect(CoderException.class);
+    thrown.expectMessage("cannot encode a null ElementAndRestriction");
+
+    CoderUtils.encodeToBase64(
+        ElementAndRestrictionCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
new file mode 100644
index 0000000..a76c4da
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItems;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link SplittableParDo}. */
+@RunWith(JUnit4.class)
+public class SplittableParDoTest {
+  // ----------------- Tests for whether the transform sets boundedness correctly --------------
+  private static class SomeRestriction implements Serializable {}
+
+  private static class SomeRestrictionTracker implements RestrictionTracker<SomeRestriction> {
+    private final SomeRestriction someRestriction = new SomeRestriction();
+
+    @Override
+    public SomeRestriction currentRestriction() {
+      return someRestriction;
+    }
+
+    @Override
+    public SomeRestriction checkpoint() {
+      return someRestriction;
+    }
+  }
+
+  private static class BoundedFakeFn extends DoFn<Integer, String> {
+    @ProcessElement
+    public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {}
+
+    @GetInitialRestriction
+    public SomeRestriction getInitialRestriction(Integer element) {
+      return null;
+    }
+
+    @NewTracker
+    public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+      return null;
+    }
+  }
+
+  private static class UnboundedFakeFn extends DoFn<Integer, String> {
+    @ProcessElement
+    public ProcessContinuation processElement(
+        ProcessContext context, SomeRestrictionTracker tracker) {
+      return stop();
+    }
+
+    @GetInitialRestriction
+    public SomeRestriction getInitialRestriction(Integer element) {
+      return null;
+    }
+
+    @NewTracker
+    public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+      return null;
+    }
+  }
+
+  private static PCollection<Integer> makeUnboundedCollection(Pipeline pipeline) {
+    return pipeline
+        .apply("unbounded", Create.of(1, 2, 3))
+        .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
+  }
+
+  private static PCollection<Integer> makeBoundedCollection(Pipeline pipeline) {
+    return pipeline
+        .apply("bounded", Create.of(1, 2, 3))
+        .setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testBoundednessForBoundedFn() {
+    Pipeline pipeline = TestPipeline.create();
+    DoFn<Integer, String> boundedFn = new BoundedFakeFn();
+    assertEquals(
+        PCollection.IsBounded.BOUNDED,
+        makeBoundedCollection(pipeline)
+            .apply("bounded to bounded", new SplittableParDo<>(boundedFn))
+            .isBounded());
+    assertEquals(
+        PCollection.IsBounded.BOUNDED,
+        makeUnboundedCollection(pipeline)
+            .apply("bounded to unbounded", new SplittableParDo<>(boundedFn))
+            .isBounded());
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testBoundednessForUnboundedFn() {
+    Pipeline pipeline = TestPipeline.create();
+    DoFn<Integer, String> unboundedFn = new UnboundedFakeFn();
+    assertEquals(
+        PCollection.IsBounded.BOUNDED,
+        makeBoundedCollection(pipeline)
+            .apply("unbounded to bounded", new SplittableParDo<>(unboundedFn))
+            .isBounded());
+    assertEquals(
+        PCollection.IsBounded.BOUNDED,
+        makeUnboundedCollection(pipeline)
+            .apply("unbounded to unbounded", new SplittableParDo<>(unboundedFn))
+            .isBounded());
+  }
+
+  // ------------------------------- Tests for ProcessFn ---------------------------------
+
+  /**
+   * A helper for testing {@link SplittableParDo.ProcessFn} on 1 element (but possibly over multiple
+   * {@link DoFn.ProcessElement} calls).
+   */
+  private static class ProcessFnTester<
+      InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> {
+    private final DoFnTester<
+            KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+        tester;
+    private Instant currentProcessingTime;
+
+    ProcessFnTester(
+        Instant currentProcessingTime,
+        DoFn<InputT, OutputT> fn,
+        Coder<InputT> inputCoder,
+        Coder<RestrictionT> restrictionCoder)
+        throws Exception {
+      SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
+          new SplittableParDo.ProcessFn<>(
+              fn, inputCoder, restrictionCoder, IntervalWindow.getCoder());
+      this.tester = DoFnTester.of(processFn);
+      this.tester.startBundle();
+      this.tester.advanceProcessingTime(currentProcessingTime);
+
+      this.currentProcessingTime = currentProcessingTime;
+    }
+
+    /** Performs a seed {@link DoFn.ProcessElement} call feeding the element and restriction. */
+    void startElement(InputT element, RestrictionT restriction) throws Exception {
+      startElement(
+          WindowedValue.of(
+              ElementAndRestriction.of(element, restriction),
+              currentProcessingTime,
+              GlobalWindow.INSTANCE,
+              PaneInfo.ON_TIME_AND_ONLY_FIRING));
+    }
+
+    void startElement(WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue)
+        throws Exception {
+      tester.processElement(KeyedWorkItems.elementsWorkItem("key", Arrays.asList(windowedValue)));
+    }
+
+    /**
+     * Advances processing time by a given duration and, if any timers fired, performs a non-seed
+     * {@link DoFn.ProcessElement} call, feeding it the timers.
+     */
+    boolean advanceProcessingTimeBy(Duration duration) throws Exception {
+      currentProcessingTime = currentProcessingTime.plus(duration);
+      List<TimerInternals.TimerData> timers = tester.advanceProcessingTime(currentProcessingTime);
+      if (timers.isEmpty()) {
+        return false;
+      }
+      tester.processElement(
+          KeyedWorkItems.<String, ElementAndRestriction<InputT, RestrictionT>>timersWorkItem(
+              "key", timers));
+      return true;
+    }
+
+    List<TimestampedValue<OutputT>> peekOutputElementsInWindow(BoundedWindow window) {
+      return tester.peekOutputElementsInWindow(window);
+    }
+
+    List<OutputT> takeOutputElements() {
+      return tester.takeOutputElements();
+    }
+  }
+
+  /** A simple splittable {@link DoFn} that's actually monolithic. */
+  private static class ToStringFn extends DoFn<Integer, String> {
+    @ProcessElement
+    public void process(ProcessContext c, SomeRestrictionTracker tracker) {
+      c.output(c.element().toString() + "a");
+      c.output(c.element().toString() + "b");
+      c.output(c.element().toString() + "c");
+    }
+
+    @GetInitialRestriction
+    public SomeRestriction getInitialRestriction(Integer elem) {
+      return new SomeRestriction();
+    }
+
+    @NewTracker
+    public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+      return new SomeRestrictionTracker();
+    }
+  }
+
+  @Test
+  public void testTrivialProcessFnPropagatesOutputsWindowsAndTimestamp() throws Exception {
+    // Tests that ProcessFn correctly propagates windows and timestamp of the element
+    // inside the KeyedWorkItem.
+    // The underlying DoFn is actually monolithic, so this doesn't test splitting.
+    DoFn<Integer, String> fn = new ToStringFn();
+
+    Instant base = Instant.now();
+    ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester =
+        new ProcessFnTester<>(
+            base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeRestriction.class));
+
+    IntervalWindow w1 =
+        new IntervalWindow(
+            base.minus(Duration.standardMinutes(1)), base.plus(Duration.standardMinutes(1)));
+    IntervalWindow w2 =
+        new IntervalWindow(
+            base.minus(Duration.standardMinutes(2)), base.plus(Duration.standardMinutes(2)));
+    IntervalWindow w3 =
+        new IntervalWindow(
+            base.minus(Duration.standardMinutes(3)), base.plus(Duration.standardMinutes(3)));
+
+    tester.startElement(
+        WindowedValue.of(
+            ElementAndRestriction.of(42, new SomeRestriction()),
+            base,
+            Arrays.asList(w1, w2, w3),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    for (IntervalWindow w : new IntervalWindow[] {w1, w2, w3}) {
+      assertEquals(
+          Arrays.asList(
+              TimestampedValue.of("42a", base),
+              TimestampedValue.of("42b", base),
+              TimestampedValue.of("42c", base)),
+          tester.peekOutputElementsInWindow(w));
+    }
+  }
+
+  /** A simple splittable {@link DoFn} that outputs the given element every 5 seconds forever. */
+  private static class SelfInitiatedResumeFn extends DoFn<Integer, String> {
+    @ProcessElement
+    public ProcessContinuation process(ProcessContext c, SomeRestrictionTracker tracker) {
+      c.output(c.element().toString());
+      return resume().withResumeDelay(Duration.standardSeconds(5)).withWatermark(c.timestamp());
+    }
+
+    @GetInitialRestriction
+    public SomeRestriction getInitialRestriction(Integer elem) {
+      return new SomeRestriction();
+    }
+
+    @NewTracker
+    public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+      return new SomeRestrictionTracker();
+    }
+  }
+
+  @Test
+  public void testResumeSetsTimer() throws Exception {
+    DoFn<Integer, String> fn = new SelfInitiatedResumeFn();
+    Instant base = Instant.now();
+    ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester =
+        new ProcessFnTester<>(
+            base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeRestriction.class));
+
+    tester.startElement(42, new SomeRestriction());
+    assertThat(tester.takeOutputElements(), contains("42"));
+
+    // Should resume after 5 seconds: advancing by 3 seconds should have no effect.
+    assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
+    assertTrue(tester.takeOutputElements().isEmpty());
+
+    // 6 seconds should be enough - should invoke the fn again.
+    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
+    assertThat(tester.takeOutputElements(), contains("42"));
+
+    // Should again resume after 5 seconds: advancing by 3 seconds should again have no effect.
+    assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
+    assertTrue(tester.takeOutputElements().isEmpty());
+
+    // 6 seconds should again be enough.
+    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
+    assertThat(tester.takeOutputElements(), contains("42"));
+  }
+
+  private static class SomeCheckpoint implements Serializable {
+    private int firstUnprocessedIndex;
+
+    private SomeCheckpoint(int firstUnprocessedIndex) {
+      this.firstUnprocessedIndex = firstUnprocessedIndex;
+    }
+  }
+
+  private static class SomeCheckpointTracker implements RestrictionTracker<SomeCheckpoint> {
+    private SomeCheckpoint current;
+    private boolean isActive = true;
+
+    private SomeCheckpointTracker(SomeCheckpoint current) {
+      this.current = current;
+    }
+
+    @Override
+    public SomeCheckpoint currentRestriction() {
+      return current;
+    }
+
+    public boolean tryUpdateCheckpoint(int firstUnprocessedIndex) {
+      if (!isActive) {
+        return false;
+      }
+      current = new SomeCheckpoint(firstUnprocessedIndex);
+      return true;
+    }
+
+    @Override
+    public SomeCheckpoint checkpoint() {
+      isActive = false;
+      return current;
+    }
+  }
+
+  /**
+   * A splittable {@link DoFn} that generates the sequence [init, init + total) in batches of given
+   * size.
+   */
+  private static class CounterFn extends DoFn<Integer, String> {
+    private final int numTotalOutputs;
+    private final int numOutputsPerCall;
+
+    private CounterFn(int numTotalOutputs, int numOutputsPerCall) {
+      this.numTotalOutputs = numTotalOutputs;
+      this.numOutputsPerCall = numOutputsPerCall;
+    }
+
+    @ProcessElement
+    public ProcessContinuation process(ProcessContext c, SomeCheckpointTracker tracker) {
+      int start = tracker.currentRestriction().firstUnprocessedIndex;
+      for (int i = 0; i < numOutputsPerCall; ++i) {
+        int index = start + i;
+        if (!tracker.tryUpdateCheckpoint(index + 1)) {
+          return resume();
+        }
+        if (index >= numTotalOutputs) {
+          return stop();
+        }
+        c.output(String.valueOf(c.element() + index));
+      }
+      return resume();
+    }
+
+    @GetInitialRestriction
+    public SomeCheckpoint getInitialRestriction(Integer elem) {
+      throw new UnsupportedOperationException("Expected to be supplied explicitly in this test");
+    }
+
+    @NewTracker
+    public SomeCheckpointTracker newTracker(SomeCheckpoint restriction) {
+      return new SomeCheckpointTracker(restriction);
+    }
+  }
+
+  @Test
+  public void testResumeCarriesOverState() throws Exception {
+    DoFn<Integer, String> fn = new CounterFn(3, 1);
+    Instant base = Instant.now();
+    ProcessFnTester<Integer, String, SomeCheckpoint, SomeCheckpointTracker> tester =
+        new ProcessFnTester<>(
+            base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeCheckpoint.class));
+
+    tester.startElement(42, new SomeCheckpoint(0));
+    assertThat(tester.takeOutputElements(), contains("42"));
+    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+    assertThat(tester.takeOutputElements(), contains("43"));
+    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+    assertThat(tester.takeOutputElements(), contains("44"));
+    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+    // After outputting all 3 items, should not output anything more.
+    assertEquals(0, tester.takeOutputElements().size());
+    // Should also not ask to resume.
+    assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+  }
+
+  @Test
+  public void testReactsToCheckpoint() throws Exception {
+    int max = SplittableParDo.ProcessFn.MAX_OUTPUTS_PER_BUNDLE;
+    // Create an fn that attempts to 2x output more than checkpointing allows.
+    DoFn<Integer, String> fn = new CounterFn(2 * max + max / 2, 2 * max);
+    Instant base = Instant.now();
+    int baseIndex = 42;
+
+    ProcessFnTester<Integer, String, SomeCheckpoint, SomeCheckpointTracker> tester =
+        new ProcessFnTester<>(
+            base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeCheckpoint.class));
+
+    List<String> elements;
+
+    tester.startElement(baseIndex, new SomeCheckpoint(0));
+    elements = tester.takeOutputElements();
+    assertEquals(max, elements.size());
+    // Should output the range [0, max)
+    assertThat(elements, hasItem(String.valueOf(baseIndex)));
+    assertThat(elements, hasItem(String.valueOf(baseIndex + max - 1)));
+
+    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+    elements = tester.takeOutputElements();
+    assertEquals(max, elements.size());
+    // Should output the range [max, 2*max)
+    assertThat(elements, hasItem(String.valueOf(baseIndex + max)));
+    assertThat(elements, hasItem(String.valueOf(baseIndex + 2 * max - 1)));
+
+    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+    elements = tester.takeOutputElements();
+    assertEquals(max / 2, elements.size());
+    // Should output the range [2*max, 2*max + max/2)
+    assertThat(elements, hasItem(String.valueOf(baseIndex + 2 * max)));
+    assertThat(elements, hasItem(String.valueOf(baseIndex + 2 * max + max / 2 - 1)));
+    assertThat(elements, not(hasItem((String.valueOf(baseIndex + 2 * max + max / 2)))));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
new file mode 100644
index 0000000..e232552
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.runners.core.GBKIntoKeyedWorkItems;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItemCoder;
+import org.apache.beam.sdk.util.ReifyTimestampsAndWindows;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+/** Provides an implementation of {@link GBKIntoKeyedWorkItems} for the Direct Runner. */
+class DirectGBKIntoKeyedWorkItemsOverrideFactory implements PTransformOverrideFactory {
+  @Override
+  @SuppressWarnings("unchecked")
+  public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
+      PTransform<InputT, OutputT> transform) {
+    return new DirectGBKIntoKeyedWorkItems(transform.getName());
+  }
+
+  /** The Direct Runner specific implementation of {@link GBKIntoKeyedWorkItems}. */
+  private static class DirectGBKIntoKeyedWorkItems<KeyT, InputT>
+      extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
+    DirectGBKIntoKeyedWorkItems(String name) {
+      super(name);
+    }
+
+    @Override
+    public PCollection<KeyedWorkItem<KeyT, InputT>> apply(PCollection<KV<KeyT, InputT>> input) {
+      checkArgument(input.getCoder() instanceof KvCoder);
+      KvCoder<KeyT, InputT> kvCoder = (KvCoder<KeyT, InputT>) input.getCoder();
+      return input
+          .apply(new ReifyTimestampsAndWindows<KeyT, InputT>())
+          // TODO: Perhaps windowing strategy should instead be set by ReifyTAW, or by DGBKO
+          .setWindowingStrategyInternal(WindowingStrategy.globalDefault())
+          .apply(new DirectGroupByKey.DirectGroupByKeyOnly<KeyT, InputT>())
+          .setCoder(
+              KeyedWorkItemCoder.of(
+                  kvCoder.getKeyCoder(),
+                  kvCoder.getValueCoder(),
+                  input.getWindowingStrategy().getWindowFn().windowCoder()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 224101a..a72f7ae 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.GBKIntoKeyedWorkItems;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
@@ -83,6 +84,10 @@ public class DirectRunner
               .put(GroupByKey.class, new DirectGroupByKeyOverrideFactory())
               .put(TestStream.class, new DirectTestStreamFactory())
               .put(Write.Bound.class, new WriteWithShardingFactory())
+              .put(ParDo.Bound.class, new ParDoOverrideFactory())
+              .put(
+                  GBKIntoKeyedWorkItems.class,
+                  new DirectGBKIntoKeyedWorkItemsOverrideFactory())
               .build();
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
new file mode 100644
index 0000000..a57735c
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+/**
+ * A {@link PTransformOverrideFactory} that provides overrides for applications of a {@link ParDo}
+ * in the direct runner. Currently overrides applications of <a
+ * href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a>.
+ */
+class ParDoOverrideFactory implements PTransformOverrideFactory {
+  @Override
+  @SuppressWarnings("unchecked")
+  public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
+      PTransform<InputT, OutputT> transform) {
+    if (!(transform instanceof ParDo.Bound)) {
+      return transform;
+    }
+    ParDo.Bound<InputT, OutputT> that = (ParDo.Bound<InputT, OutputT>) transform;
+    DoFn<InputT, OutputT> fn = DoFnAdapters.getDoFn(that.getFn());
+    if (fn == null) {
+      // This is an OldDoFn, hence not splittable.
+      return transform;
+    }
+    DoFnSignature signature = DoFnSignatures.INSTANCE.getOrParseSignature(fn.getClass());
+    if (!signature.processElement().isSplittable()) {
+      return transform;
+    }
+    return new SplittableParDo(fn);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
new file mode 100644
index 0000000..84a0cd9
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.MutableDateTime;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for <a href="https://s.apache.org/splittable-do-fn>splittable</a> {@link DoFn} behavior
+ * using the direct runner.
+ *
+ * <p>TODO: make this use @RunnableOnService.
+ */
+@RunWith(JUnit4.class)
+public class SplittableDoFnTest {
+  static class OffsetRange implements Serializable {
+    public final int from;
+    public final int to;
+
+    OffsetRange(int from, int to) {
+      this.from = from;
+      this.to = to;
+    }
+  }
+
+  private static class OffsetRangeTracker implements RestrictionTracker<OffsetRange> {
+    private OffsetRange range;
+    private Integer lastClaimedIndex = null;
+
+    OffsetRangeTracker(OffsetRange range) {
+      this.range = checkNotNull(range);
+    }
+
+    @Override
+    public OffsetRange currentRestriction() {
+      return range;
+    }
+
+    @Override
+    public OffsetRange checkpoint() {
+      if (lastClaimedIndex == null) {
+        OffsetRange res = range;
+        range = new OffsetRange(range.from, range.from);
+        return res;
+      }
+      OffsetRange res = new OffsetRange(lastClaimedIndex + 1, range.to);
+      this.range = new OffsetRange(range.from, lastClaimedIndex + 1);
+      return res;
+    }
+
+    boolean tryClaim(int i) {
+      checkState(lastClaimedIndex == null || i > lastClaimedIndex);
+      if (i >= range.to) {
+        return false;
+      }
+      lastClaimedIndex = i;
+      return true;
+    }
+  }
+
+  static class PairStringWithIndexToLength extends DoFn<String, KV<String, Integer>> {
+    @ProcessElement
+    public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) {
+      for (int i = tracker.currentRestriction().from; tracker.tryClaim(i); ++i) {
+        c.output(KV.of(c.element(), i));
+        if (i % 3 == 0) {
+          return ProcessContinuation.resume();
+        }
+      }
+      return ProcessContinuation.stop();
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRange(String element) {
+      return new OffsetRange(0, element.length());
+    }
+
+    @SplitRestriction
+    public void splitRange(
+        String element, OffsetRange range, OutputReceiver<OffsetRange> receiver) {
+      receiver.output(new OffsetRange(range.from, (range.from + range.to) / 2));
+      receiver.output(new OffsetRange((range.from + range.to) / 2, range.to));
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(OffsetRange range) {
+      return new OffsetRangeTracker(range);
+    }
+  }
+
+  private static class ReifyTimestampsFn<T> extends DoFn<T, TimestampedValue<T>> {
+    @ProcessElement
+    public void process(ProcessContext c) {
+      c.output(TimestampedValue.of(c.element(), c.timestamp()));
+    }
+  }
+
+  @Test
+  public void testPairWithIndexBasic() throws ClassNotFoundException {
+    Pipeline p = TestPipeline.create();
+    p.getOptions().setRunner(DirectRunner.class);
+    PCollection<KV<String, Integer>> res =
+        p.apply(Create.of("a", "bb", "ccccc"))
+            .apply(ParDo.of(new PairStringWithIndexToLength()))
+            .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
+
+    PAssert.that(res)
+        .containsInAnyOrder(
+            Arrays.asList(
+                KV.of("a", 0),
+                KV.of("bb", 0),
+                KV.of("bb", 1),
+                KV.of("ccccc", 0),
+                KV.of("ccccc", 1),
+                KV.of("ccccc", 2),
+                KV.of("ccccc", 3),
+                KV.of("ccccc", 4)));
+
+    p.run();
+  }
+
+  @Test
+  public void testPairWithIndexWindowedTimestamped() throws ClassNotFoundException {
+    // Tests that Splittable DoFn correctly propagates windowing strategy, windows and timestamps
+    // of elements in the input collection.
+    Pipeline p = TestPipeline.create();
+    p.getOptions().setRunner(DirectRunner.class);
+
+    MutableDateTime mutableNow = Instant.now().toMutableDateTime();
+    mutableNow.setMillisOfSecond(0);
+    Instant now = mutableNow.toInstant();
+    Instant nowP1 = now.plus(Duration.standardSeconds(1));
+    Instant nowP2 = now.plus(Duration.standardSeconds(2));
+
+    SlidingWindows windowFn =
+        SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1));
+    PCollection<KV<String, Integer>> res =
+        p.apply(
+                Create.timestamped(
+                    TimestampedValue.of("a", now),
+                    TimestampedValue.of("bb", nowP1),
+                    TimestampedValue.of("ccccc", nowP2)))
+            .apply(Window.<String>into(windowFn))
+            .apply(ParDo.of(new PairStringWithIndexToLength()))
+            .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
+
+    assertEquals(windowFn, res.getWindowingStrategy().getWindowFn());
+
+    PCollection<TimestampedValue<KV<String, Integer>>> timestamped =
+        res.apply("Reify timestamps", ParDo.of(new ReifyTimestampsFn<KV<String, Integer>>()));
+
+    for (int i = 0; i < 4; ++i) {
+      Instant base = now.minus(Duration.standardSeconds(i));
+      IntervalWindow window = new IntervalWindow(base, base.plus(Duration.standardSeconds(5)));
+
+      List<TimestampedValue<KV<String, Integer>>> expectedUnfiltered =
+          Arrays.asList(
+              TimestampedValue.of(KV.of("a", 0), now),
+              TimestampedValue.of(KV.of("bb", 0), nowP1),
+              TimestampedValue.of(KV.of("bb", 1), nowP1),
+              TimestampedValue.of(KV.of("ccccc", 0), nowP2),
+              TimestampedValue.of(KV.of("ccccc", 1), nowP2),
+              TimestampedValue.of(KV.of("ccccc", 2), nowP2),
+              TimestampedValue.of(KV.of("ccccc", 3), nowP2),
+              TimestampedValue.of(KV.of("ccccc", 4), nowP2));
+
+      List<TimestampedValue<KV<String, Integer>>> expected = new ArrayList<>();
+      for (TimestampedValue<KV<String, Integer>> tv : expectedUnfiltered) {
+        if (!window.start().isAfter(tv.getTimestamp())
+            && !tv.getTimestamp().isAfter(window.maxTimestamp())) {
+          expected.add(tv);
+        }
+      }
+      assertFalse(expected.isEmpty());
+
+      PAssert.that(timestamped).inWindow(window).containsInAnyOrder(expected);
+    }
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
index f806926..789f4b2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
@@ -76,6 +76,12 @@ public @interface Experimental {
     TIMERS,
 
     /** Experimental APIs related to customizing the output time for computed values. */
-    OUTPUT_TIME
+    OUTPUT_TIME,
+
+    /**
+     * <a href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a>.
+     * Do not use: API is unstable and runner support is incomplete.
+     */
+    SPLITTABLE_DO_FN,
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index fb7fbd4..62da28c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.auto.value.AutoValue;
 import java.io.Serializable;
 import java.lang.annotation.Documented;
 import java.lang.annotation.ElementType;
@@ -29,6 +30,8 @@ import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 import java.util.HashMap;
 import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -37,9 +40,11 @@ import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -341,14 +346,20 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
      */
     @Deprecated
     WindowingInternals<InputT, OutputT> windowingInternals();
+
+    /**
+     * If this is a splittable {@link DoFn}, returns the {@link RestrictionTracker} associated with
+     * the current {@link ProcessElement} call.
+     */
+    <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker();
   }
 
-  /** A placeholder for testing handling of output types during {@link DoFn} reflection. */
+  /** Receives values of the given type. */
   public interface OutputReceiver<T> {
     void output(T output);
   }
 
-  /** A placeholder for testing handling of input types during {@link DoFn} reflection. */
+  /** Provides a single value of the given type. */
   public interface InputProvider<T> {
     T get();
   }
@@ -375,6 +386,10 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
     public WindowingInternals<InputT, OutputT> windowingInternals() {
       return null;
     }
+
+    public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
+      return null;
+    }
   }
 
   /////////////////////////////////////////////////////////////////////////////
@@ -412,14 +427,57 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
   public @interface StartBundle {}
 
   /**
-   * Annotation for the method to use for processing elements. A subclass of
-   * {@link DoFn} must have a method with this annotation satisfying
-   * the following constraints in order for it to be executable:
+   * Annotation for the method to use for processing elements. A subclass of {@link DoFn} must have
+   * a method with this annotation.
+   *
+   * <p>The signature of this method must satisfy the following constraints:
+   *
    * <ul>
-   *   <li>It must have at least one argument.
-   *   <li>Its first argument must be a {@link DoFn.ProcessContext}.
-   *   <li>Its remaining argument, if any, must be {@link BoundedWindow}.
+   * <li>Its first argument must be a {@link DoFn.ProcessContext}.
+   * <li>If one of its arguments is a subtype of {@link RestrictionTracker}, then it is a <a
+   *     href="https://s.apache.org/splittable-do-fn>splittable</a> {@link DoFn} subject to the
+   *     separate requirements described below. Items below are assuming this is not a splittable
+   *     {@link DoFn}.
+   * <li>If one of its arguments is {@link BoundedWindow}, this argument corresponds to the window
+   *     of the current element. If absent, a runner may perform additional optimizations.
+   * <li>It must return {@code void}.
    * </ul>
+   *
+   * <h2>Splittable DoFn's (WARNING: work in progress, do not use)</h2>
+   *
+   * <p>A {@link DoFn} is <i>splittable</i> if its {@link ProcessElement} method has a parameter
+   * whose type is a subtype of {@link RestrictionTracker}. This is an advanced feature and an
+   * overwhelming majority of users will never need to write a splittable {@link DoFn}. Right now
+   * the implementation of this feature is in progress and it's not ready for any use.
+   *
+   * <p>See <a href="https://s.apache.org/splittable-do-fn">the proposal</a> for an overview of the
+   * involved concepts (<i>splittable DoFn</i>, <i>restriction</i>, <i>restriction tracker</i>).
+   *
+   * <p>If a {@link DoFn} is splittable, the following constraints must be respected:
+   *
+   * <ul>
+   * <li>It <i>must</i> define a {@link GetInitialRestriction} method.
+   * <li>It <i>may</i> define a {@link SplitRestriction} method.
+   * <li>It <i>must</i> define a {@link NewTracker} method returning the same type as the type of
+   *     the {@link RestrictionTracker} argument of {@link ProcessElement}, which in turn must be a
+   *     subtype of {@code RestrictionTracker<R>} where {@code R} is the restriction type returned
+   *     by {@link GetInitialRestriction}.
+   * <li>It <i>may</i> define a {@link GetRestrictionCoder} method.
+   * <li>The type of restrictions used by all of these methods must be the same.
+   * <li>Its {@link ProcessElement} method <i>may</i> return a {@link ProcessContinuation} to
+   *     indicate whether there is more work to be done for the current element.
+   * <li>Its {@link ProcessElement} method <i>must not</i> use any extra context parameters, such as
+   *     {@link BoundedWindow}.
+   * <li>The {@link DoFn} itself <i>may</i> be annotated with {@link BoundedPerElement} or
+   *     {@link UnboundedPerElement}, but not both at the same time. If it's not annotated with
+   *     either of these, it's assumed to be {@link BoundedPerElement} if its {@link
+   *     ProcessElement} method returns {@code void} and {@link UnboundedPerElement} if it
+   *     returns a {@link ProcessContinuation}.
+   * </ul>
+   *
+   * <p>A non-splittable {@link DoFn} <i>must not</i> define any of these methods.
+   *
+   * <p>More documentation will be added when the feature becomes ready for general usage.
    */
   @Documented
   @Retention(RetentionPolicy.RUNTIME)
@@ -455,6 +513,150 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
   }
 
   /**
+   * Annotation for the method that maps an element to an initial restriction for a <a
+   * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
+   *
+   * <p>Signature: {@code RestrictionT getInitialRestriction(InputT element);}
+   *
+   * <p>TODO: Make the InputT parameter optional.
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  public @interface GetInitialRestriction {}
+
+  /**
+   * Annotation for the method that returns the coder to use for the restriction of a <a
+   * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
+   *
+   * <p>If not defined, a coder will be inferred using standard coder inference rules and the
+   * pipeline's {@link Pipeline#getCoderRegistry coder registry}.
+   *
+   * <p>This method will be called only at pipeline construction time.
+   *
+   * <p>Signature: {@code Coder<RestrictionT> getRestrictionCoder();}
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  public @interface GetRestrictionCoder {}
+
+  /**
+   * Annotation for the method that splits restriction of a <a
+   * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn} into multiple parts to
+   * be processed in parallel.
+   *
+   * <p>Signature: {@code List<RestrictionT> splitRestriction( InputT element, RestrictionT
+   * restriction);}
+   *
+   * <p>Optional: if this method is omitted, the restriction will not be split (equivalent to
+   * defining the method and returning {@code Collections.singletonList(restriction)}).
+   *
+   * <p>TODO: Introduce a parameter for controlling granularity of splitting, e.g. numParts. TODO:
+   * Make the InputT parameter optional.
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  public @interface SplitRestriction {}
+
+  /**
+   * Annotation for the method that creates a new {@link RestrictionTracker} for the restriction of
+   * a <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
+   *
+   * <p>Signature: {@code MyRestrictionTracker newTracker(RestrictionT restriction);} where {@code
+   * MyRestrictionTracker} must be a subtype of {@code RestrictionTracker<RestrictionT>}.
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  public @interface NewTracker {}
+
+  /**
+   * Annotation on a <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}
+   * specifying that the {@link DoFn} performs a bounded amount of work per input element, so
+   * applying it to a bounded {@link PCollection} will produce also a bounded {@link PCollection}.
+   * It is an error to specify this on a non-splittable {@link DoFn}.
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.TYPE)
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  public @interface BoundedPerElement {}
+
+  /**
+   * Annotation on a <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}
+   * specifying that the {@link DoFn} performs an unbounded amount of work per input element, so
+   * applying it to a bounded {@link PCollection} will produce an unbounded {@link PCollection}. It
+   * is an error to specify this on a non-splittable {@link DoFn}.
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.TYPE)
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  public @interface UnboundedPerElement {}
+
+  // This can't be put into ProcessContinuation itself due to the following problem:
+  // http://ternarysearch.blogspot.com/2013/07/static-initialization-deadlock.html
+  private static final ProcessContinuation PROCESS_CONTINUATION_STOP =
+      new AutoValue_DoFn_ProcessContinuation(false, Duration.ZERO, null);
+
+  /**
+   * When used as a return value of {@link ProcessElement}, indicates whether there is more work to
+   * be done for the current element.
+   */
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  @AutoValue
+  public abstract static class ProcessContinuation {
+    /** Indicates that there is no more work to be done for the current element. */
+    public static ProcessContinuation stop() {
+      return PROCESS_CONTINUATION_STOP;
+    }
+
+    /** Indicates that there is more work to be done for the current element. */
+    public static ProcessContinuation resume() {
+      return new AutoValue_DoFn_ProcessContinuation(true, Duration.ZERO, null);
+    }
+
+    /**
+     * If false, the {@link DoFn} promises that there is no more work remaining for the current
+     * element, so the runner should not resume the {@link ProcessElement} call.
+     */
+    public abstract boolean shouldResume();
+
+    /**
+     * A minimum duration that should elapse between the end of this {@link ProcessElement} call and
+     * the {@link ProcessElement} call continuing processing of the same element. By default, zero.
+     */
+    public abstract Duration resumeDelay();
+
+    /**
+     * A lower bound provided by the {@link DoFn} on timestamps of the output that will be emitted
+     * by future {@link ProcessElement} calls continuing processing of the current element.
+     *
+     * <p>A runner should treat an absent value as equivalent to the timestamp of the input element.
+     */
+    @Nullable
+    public abstract Instant getWatermark();
+
+    /** Builder method to set the value of {@link #resumeDelay()}. */
+    public ProcessContinuation withResumeDelay(Duration resumeDelay) {
+      return new AutoValue_DoFn_ProcessContinuation(
+          shouldResume(), resumeDelay, getWatermark());
+    }
+
+    /** Builder method to set the value of {@link #getWatermark()}. */
+    public ProcessContinuation withWatermark(Instant watermark) {
+      return new AutoValue_DoFn_ProcessContinuation(
+          shouldResume(), resumeDelay(), watermark);
+    }
+  }
+
+  /**
    * Returns an {@link Aggregator} with aggregation logic specified by the
    * {@link CombineFn} argument. The name provided must be unique across
    * {@link Aggregator}s created within the {@link DoFn}. Aggregators can only be created