You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/02 03:27:29 UTC

[2/2] incubator-beam git commit: Category for tests using splittable DoFn

Category for tests using splittable DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/887b357f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/887b357f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/887b357f

Branch: refs/heads/master
Commit: 887b357f7dff9fe10914b32dc69d32f0716fa237
Parents: 63491bf
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Nov 30 12:55:45 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 1 19:26:37 2016 -0800

----------------------------------------------------------------------
 runners/apex/pom.xml                            |   1 +
 .../beam/runners/direct/SplittableDoFnTest.java | 407 -------------------
 runners/flink/runner/pom.xml                    |   1 +
 runners/google-cloud-dataflow-java/pom.xml      |   1 +
 runners/spark/pom.xml                           |   1 +
 .../beam/sdk/testing/UsesSplittableParDo.java   |  25 ++
 .../beam/sdk/transforms/SplittableDoFnTest.java | 401 ++++++++++++++++++
 7 files changed, 430 insertions(+), 407 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/887b357f/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 84185b8..983781d 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -186,6 +186,7 @@
             <configuration>
               <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
               <excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups>
+              <excludedGroups>org.apache.beam.sdk.testing.UsesSplittableParDo</excludedGroups>
               <parallel>none</parallel>
               <failIfNoTests>true</failIfNoTests>
               <dependenciesToScan>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/887b357f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
deleted file mode 100644
index f9e833f..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestStream;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.Keys;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.joda.time.MutableDateTime;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for <a href="https://s.apache.org/splittable-do-fn>splittable</a> {@link DoFn} behavior
- * using the direct runner.
- *
- * <p>TODO: make this use @RunnableOnService.
- */
-@RunWith(JUnit4.class)
-public class SplittableDoFnTest {
-  static class OffsetRange implements Serializable {
-    public final int from;
-    public final int to;
-
-    OffsetRange(int from, int to) {
-      this.from = from;
-      this.to = to;
-    }
-
-    @Override
-    public String toString() {
-      return "OffsetRange{" + "from=" + from + ", to=" + to + '}';
-    }
-  }
-
-  private static class OffsetRangeTracker implements RestrictionTracker<OffsetRange> {
-    private OffsetRange range;
-    private Integer lastClaimedIndex = null;
-
-    OffsetRangeTracker(OffsetRange range) {
-      this.range = checkNotNull(range);
-    }
-
-    @Override
-    public OffsetRange currentRestriction() {
-      return range;
-    }
-
-    @Override
-    public OffsetRange checkpoint() {
-      if (lastClaimedIndex == null) {
-        OffsetRange res = range;
-        range = new OffsetRange(range.from, range.from);
-        return res;
-      }
-      OffsetRange res = new OffsetRange(lastClaimedIndex + 1, range.to);
-      this.range = new OffsetRange(range.from, lastClaimedIndex + 1);
-      return res;
-    }
-
-    boolean tryClaim(int i) {
-      checkState(lastClaimedIndex == null || i > lastClaimedIndex);
-      if (i >= range.to) {
-        return false;
-      }
-      lastClaimedIndex = i;
-      return true;
-    }
-  }
-
-  static class PairStringWithIndexToLength extends DoFn<String, KV<String, Integer>> {
-    @ProcessElement
-    public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) {
-      for (int i = tracker.currentRestriction().from; tracker.tryClaim(i); ++i) {
-        c.output(KV.of(c.element(), i));
-        if (i % 3 == 0) {
-          return ProcessContinuation.resume();
-        }
-      }
-      return ProcessContinuation.stop();
-    }
-
-    @GetInitialRestriction
-    public OffsetRange getInitialRange(String element) {
-      return new OffsetRange(0, element.length());
-    }
-
-    @SplitRestriction
-    public void splitRange(
-        String element, OffsetRange range, OutputReceiver<OffsetRange> receiver) {
-      receiver.output(new OffsetRange(range.from, (range.from + range.to) / 2));
-      receiver.output(new OffsetRange((range.from + range.to) / 2, range.to));
-    }
-
-    @NewTracker
-    public OffsetRangeTracker newTracker(OffsetRange range) {
-      return new OffsetRangeTracker(range);
-    }
-  }
-
-  private static class ReifyTimestampsFn<T> extends DoFn<T, TimestampedValue<T>> {
-    @ProcessElement
-    public void process(ProcessContext c) {
-      c.output(TimestampedValue.of(c.element(), c.timestamp()));
-    }
-  }
-
-  @Test
-  public void testPairWithIndexBasic() {
-    Pipeline p = TestPipeline.create();
-    p.getOptions().setRunner(DirectRunner.class);
-    PCollection<KV<String, Integer>> res =
-        p.apply(Create.of("a", "bb", "ccccc"))
-            .apply(ParDo.of(new PairStringWithIndexToLength()))
-            .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
-
-    PAssert.that(res)
-        .containsInAnyOrder(
-            Arrays.asList(
-                KV.of("a", 0),
-                KV.of("bb", 0),
-                KV.of("bb", 1),
-                KV.of("ccccc", 0),
-                KV.of("ccccc", 1),
-                KV.of("ccccc", 2),
-                KV.of("ccccc", 3),
-                KV.of("ccccc", 4)));
-
-    p.run();
-  }
-
-  @Test
-  public void testPairWithIndexWindowedTimestamped() {
-    // Tests that Splittable DoFn correctly propagates windowing strategy, windows and timestamps
-    // of elements in the input collection.
-    Pipeline p = TestPipeline.create();
-    p.getOptions().setRunner(DirectRunner.class);
-
-    MutableDateTime mutableNow = Instant.now().toMutableDateTime();
-    mutableNow.setMillisOfSecond(0);
-    Instant now = mutableNow.toInstant();
-    Instant nowP1 = now.plus(Duration.standardSeconds(1));
-    Instant nowP2 = now.plus(Duration.standardSeconds(2));
-
-    SlidingWindows windowFn =
-        SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1));
-    PCollection<KV<String, Integer>> res =
-        p.apply(
-                Create.timestamped(
-                    TimestampedValue.of("a", now),
-                    TimestampedValue.of("bb", nowP1),
-                    TimestampedValue.of("ccccc", nowP2)))
-            .apply(Window.<String>into(windowFn))
-            .apply(ParDo.of(new PairStringWithIndexToLength()))
-            .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
-
-    assertEquals(windowFn, res.getWindowingStrategy().getWindowFn());
-
-    PCollection<TimestampedValue<KV<String, Integer>>> timestamped =
-        res.apply("Reify timestamps", ParDo.of(new ReifyTimestampsFn<KV<String, Integer>>()));
-
-    for (int i = 0; i < 4; ++i) {
-      Instant base = now.minus(Duration.standardSeconds(i));
-      IntervalWindow window = new IntervalWindow(base, base.plus(Duration.standardSeconds(5)));
-
-      List<TimestampedValue<KV<String, Integer>>> expectedUnfiltered =
-          Arrays.asList(
-              TimestampedValue.of(KV.of("a", 0), now),
-              TimestampedValue.of(KV.of("bb", 0), nowP1),
-              TimestampedValue.of(KV.of("bb", 1), nowP1),
-              TimestampedValue.of(KV.of("ccccc", 0), nowP2),
-              TimestampedValue.of(KV.of("ccccc", 1), nowP2),
-              TimestampedValue.of(KV.of("ccccc", 2), nowP2),
-              TimestampedValue.of(KV.of("ccccc", 3), nowP2),
-              TimestampedValue.of(KV.of("ccccc", 4), nowP2));
-
-      List<TimestampedValue<KV<String, Integer>>> expected = new ArrayList<>();
-      for (TimestampedValue<KV<String, Integer>> tv : expectedUnfiltered) {
-        if (!window.start().isAfter(tv.getTimestamp())
-            && !tv.getTimestamp().isAfter(window.maxTimestamp())) {
-          expected.add(tv);
-        }
-      }
-      assertFalse(expected.isEmpty());
-
-      PAssert.that(timestamped).inWindow(window).containsInAnyOrder(expected);
-    }
-    p.run();
-  }
-
-  private static class SDFWithSideInputsAndOutputs extends DoFn<Integer, String> {
-    private final PCollectionView<String> sideInput;
-    private final TupleTag<String> sideOutput;
-
-    private SDFWithSideInputsAndOutputs(
-        PCollectionView<String> sideInput, TupleTag<String> sideOutput) {
-      this.sideInput = sideInput;
-      this.sideOutput = sideOutput;
-    }
-
-    @ProcessElement
-    public void process(ProcessContext c, OffsetRangeTracker tracker) {
-      checkState(tracker.tryClaim(tracker.currentRestriction().from));
-      String side = c.sideInput(sideInput);
-      c.output("main:" + side + ":" + c.element());
-      c.sideOutput(sideOutput, "side:" + side + ":" + c.element());
-    }
-
-    @GetInitialRestriction
-    public OffsetRange getInitialRestriction(Integer value) {
-      return new OffsetRange(0, 1);
-    }
-
-    @NewTracker
-    public OffsetRangeTracker newTracker(OffsetRange range) {
-      return new OffsetRangeTracker(range);
-    }
-  }
-
-  @Test
-  public void testSideInputsAndOutputs() throws Exception {
-    Pipeline p = TestPipeline.create();
-    p.getOptions().setRunner(DirectRunner.class);
-
-    PCollectionView<String> sideInput =
-        p.apply("side input", Create.of("foo")).apply(View.<String>asSingleton());
-    TupleTag<String> mainOutputTag = new TupleTag<>("main");
-    TupleTag<String> sideOutputTag = new TupleTag<>("side");
-
-    PCollectionTuple res =
-        p.apply("input", Create.of(0, 1, 2))
-            .apply(
-                ParDo.of(new SDFWithSideInputsAndOutputs(sideInput, sideOutputTag))
-                    .withSideInputs(sideInput)
-                    .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
-    res.get(mainOutputTag).setCoder(StringUtf8Coder.of());
-    res.get(sideOutputTag).setCoder(StringUtf8Coder.of());
-
-    PAssert.that(res.get(mainOutputTag))
-        .containsInAnyOrder(Arrays.asList("main:foo:0", "main:foo:1", "main:foo:2"));
-    PAssert.that(res.get(sideOutputTag))
-        .containsInAnyOrder(Arrays.asList("side:foo:0", "side:foo:1", "side:foo:2"));
-
-    p.run();
-  }
-
-  @Test
-  public void testLateData() throws Exception {
-    Pipeline p = TestPipeline.create();
-    p.getOptions().setRunner(DirectRunner.class);
-
-    Instant base = Instant.now();
-
-    TestStream<String> stream =
-        TestStream.create(StringUtf8Coder.of())
-            .advanceWatermarkTo(base)
-            .addElements("aa")
-            .advanceWatermarkTo(base.plus(Duration.standardSeconds(5)))
-            .addElements(TimestampedValue.of("bb", base.minus(Duration.standardHours(1))))
-            .advanceProcessingTime(Duration.standardHours(1))
-            .advanceWatermarkToInfinity();
-
-    PCollection<String> input =
-        p.apply(stream)
-            .apply(
-                Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
-                    .withAllowedLateness(Duration.standardMinutes(1)));
-
-    PCollection<KV<String, Integer>> afterSDF =
-        input
-            .apply(ParDo.of(new PairStringWithIndexToLength()))
-            .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
-
-    PCollection<String> nonLate =
-        afterSDF.apply(GroupByKey.<String, Integer>create()).apply(Keys.<String>create());
-
-    // The splittable DoFn itself should not drop any data and act as pass-through.
-    PAssert.that(afterSDF)
-        .containsInAnyOrder(
-            Arrays.asList(KV.of("aa", 0), KV.of("aa", 1), KV.of("bb", 0), KV.of("bb", 1)));
-
-    // But it should preserve the windowing strategy of the data, including allowed lateness:
-    // the follow-up GBK should drop the late data.
-    assertEquals(afterSDF.getWindowingStrategy(), input.getWindowingStrategy());
-    PAssert.that(nonLate).containsInAnyOrder("aa");
-
-    p.run();
-  }
-
-  private static class SDFWithLifecycle extends DoFn<String, String> {
-    private enum State {
-      BEFORE_SETUP,
-      OUTSIDE_BUNDLE,
-      INSIDE_BUNDLE,
-      TORN_DOWN
-    }
-
-    private State state = State.BEFORE_SETUP;
-
-    @ProcessElement
-    public void processElement(ProcessContext c, OffsetRangeTracker tracker) {
-      assertEquals(State.INSIDE_BUNDLE, state);
-      assertTrue(tracker.tryClaim(0));
-      c.output(c.element());
-    }
-
-    @GetInitialRestriction
-    public OffsetRange getInitialRestriction(String value) {
-      return new OffsetRange(0, 1);
-    }
-
-    @NewTracker
-    public OffsetRangeTracker newTracker(OffsetRange range) {
-      return new OffsetRangeTracker(range);
-    }
-
-    @Setup
-    public void setUp() {
-      assertEquals(State.BEFORE_SETUP, state);
-      state = State.OUTSIDE_BUNDLE;
-    }
-
-    @StartBundle
-    public void startBundle(Context c) {
-      assertEquals(State.OUTSIDE_BUNDLE, state);
-      state = State.INSIDE_BUNDLE;
-    }
-
-    @FinishBundle
-    public void finishBundle(Context c) {
-      assertEquals(State.INSIDE_BUNDLE, state);
-      state = State.OUTSIDE_BUNDLE;
-    }
-
-    @Teardown
-    public void tearDown() {
-      assertEquals(State.OUTSIDE_BUNDLE, state);
-      state = State.TORN_DOWN;
-    }
-  }
-
-  @Test
-  public void testLifecycleMethods() throws Exception {
-    Pipeline p = TestPipeline.create();
-    p.getOptions().setRunner(DirectRunner.class);
-
-    PCollection<String> res =
-        p.apply(Create.of("a", "b", "c")).apply(ParDo.of(new SDFWithLifecycle()));
-
-    PAssert.that(res).containsInAnyOrder("a", "b", "c");
-
-    p.run();
-  }
-
-  // TODO (https://issues.apache.org/jira/browse/BEAM-988): Test that Splittable DoFn
-  // emits output immediately (i.e. has a pass-through trigger) regardless of input's
-  // windowing/triggering strategy.
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/887b357f/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 18bf6a7..3e3dd7e 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -54,6 +54,7 @@
                 <configuration>
                   <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
                   <excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups>
+                  <excludedGroups>org.apache.beam.sdk.testing.UsesSplittableParDo</excludedGroups>
                   <parallel>none</parallel>
                   <failIfNoTests>true</failIfNoTests>
                   <dependenciesToScan>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/887b357f/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 59276e4..8547499 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -78,6 +78,7 @@
             <id>runnable-on-service-tests</id>
             <configuration>
               <excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups>
+              <excludedGroups>org.apache.beam.sdk.testing.UsesSplittableParDo</excludedGroups>
               <excludes>
                 <exclude>org.apache.beam.sdk.transforms.FlattenTest</exclude>
               </excludes>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/887b357f/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index da7a72a..dc000bf 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -73,6 +73,7 @@
                 <configuration>
                   <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
                   <excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups>
+                  <excludedGroups>org.apache.beam.sdk.testing.UsesSplittableParDo</excludedGroups>
                   <forkCount>1</forkCount>
                   <reuseForks>false</reuseForks>
                   <failIfNoTests>true</failIfNoTests>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/887b357f/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDo.java
new file mode 100644
index 0000000..209936f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDo.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.transforms.ParDo;
+
+/**
+ * Category tag for validation tests which utilize splittable {@link ParDo}.
+ */
+public interface UsesSplittableParDo {}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/887b357f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
new file mode 100644
index 0000000..82bd3a3
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesSplittableParDo;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.MutableDateTime;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for <a href="https://s.apache.org/splittable-do-fn>splittable</a> {@link DoFn} behavior.
+ */
+@RunWith(JUnit4.class)
+public class SplittableDoFnTest {
+  static class OffsetRange implements Serializable {
+    public final int from;
+    public final int to;
+
+    OffsetRange(int from, int to) {
+      this.from = from;
+      this.to = to;
+    }
+
+    @Override
+    public String toString() {
+      return "OffsetRange{" + "from=" + from + ", to=" + to + '}';
+    }
+  }
+
+  private static class OffsetRangeTracker implements RestrictionTracker<OffsetRange> {
+    private OffsetRange range;
+    private Integer lastClaimedIndex = null;
+
+    OffsetRangeTracker(OffsetRange range) {
+      this.range = checkNotNull(range);
+    }
+
+    @Override
+    public OffsetRange currentRestriction() {
+      return range;
+    }
+
+    @Override
+    public OffsetRange checkpoint() {
+      if (lastClaimedIndex == null) {
+        OffsetRange res = range;
+        range = new OffsetRange(range.from, range.from);
+        return res;
+      }
+      OffsetRange res = new OffsetRange(lastClaimedIndex + 1, range.to);
+      this.range = new OffsetRange(range.from, lastClaimedIndex + 1);
+      return res;
+    }
+
+    boolean tryClaim(int i) {
+      checkState(lastClaimedIndex == null || i > lastClaimedIndex);
+      if (i >= range.to) {
+        return false;
+      }
+      lastClaimedIndex = i;
+      return true;
+    }
+  }
+
+  static class PairStringWithIndexToLength extends DoFn<String, KV<String, Integer>> {
+    @ProcessElement
+    public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) {
+      for (int i = tracker.currentRestriction().from; tracker.tryClaim(i); ++i) {
+        c.output(KV.of(c.element(), i));
+        if (i % 3 == 0) {
+          return ProcessContinuation.resume();
+        }
+      }
+      return ProcessContinuation.stop();
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRange(String element) {
+      return new OffsetRange(0, element.length());
+    }
+
+    @SplitRestriction
+    public void splitRange(
+        String element, OffsetRange range, OutputReceiver<OffsetRange> receiver) {
+      receiver.output(new OffsetRange(range.from, (range.from + range.to) / 2));
+      receiver.output(new OffsetRange((range.from + range.to) / 2, range.to));
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(OffsetRange range) {
+      return new OffsetRangeTracker(range);
+    }
+  }
+
+  private static class ReifyTimestampsFn<T> extends DoFn<T, TimestampedValue<T>> {
+    @ProcessElement
+    public void process(ProcessContext c) {
+      c.output(TimestampedValue.of(c.element(), c.timestamp()));
+    }
+  }
+
+  @Test
+  @Category({RunnableOnService.class, UsesSplittableParDo.class})
+  public void testPairWithIndexBasic() {
+    Pipeline p = TestPipeline.create();
+    PCollection<KV<String, Integer>> res =
+        p.apply(Create.of("a", "bb", "ccccc"))
+            .apply(ParDo.of(new PairStringWithIndexToLength()))
+            .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
+
+    PAssert.that(res)
+        .containsInAnyOrder(
+            Arrays.asList(
+                KV.of("a", 0),
+                KV.of("bb", 0),
+                KV.of("bb", 1),
+                KV.of("ccccc", 0),
+                KV.of("ccccc", 1),
+                KV.of("ccccc", 2),
+                KV.of("ccccc", 3),
+                KV.of("ccccc", 4)));
+
+    p.run();
+  }
+
+  @Test
+  @Category({RunnableOnService.class, UsesSplittableParDo.class})
+  public void testPairWithIndexWindowedTimestamped() {
+    // Tests that Splittable DoFn correctly propagates windowing strategy, windows and timestamps
+    // of elements in the input collection.
+    Pipeline p = TestPipeline.create();
+
+    MutableDateTime mutableNow = Instant.now().toMutableDateTime();
+    mutableNow.setMillisOfSecond(0);
+    Instant now = mutableNow.toInstant();
+    Instant nowP1 = now.plus(Duration.standardSeconds(1));
+    Instant nowP2 = now.plus(Duration.standardSeconds(2));
+
+    SlidingWindows windowFn =
+        SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1));
+    PCollection<KV<String, Integer>> res =
+        p.apply(
+                Create.timestamped(
+                    TimestampedValue.of("a", now),
+                    TimestampedValue.of("bb", nowP1),
+                    TimestampedValue.of("ccccc", nowP2)))
+            .apply(Window.<String>into(windowFn))
+            .apply(ParDo.of(new PairStringWithIndexToLength()))
+            .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
+
+    assertEquals(windowFn, res.getWindowingStrategy().getWindowFn());
+
+    PCollection<TimestampedValue<KV<String, Integer>>> timestamped =
+        res.apply("Reify timestamps", ParDo.of(new ReifyTimestampsFn<KV<String, Integer>>()));
+
+    for (int i = 0; i < 4; ++i) {
+      Instant base = now.minus(Duration.standardSeconds(i));
+      IntervalWindow window = new IntervalWindow(base, base.plus(Duration.standardSeconds(5)));
+
+      List<TimestampedValue<KV<String, Integer>>> expectedUnfiltered =
+          Arrays.asList(
+              TimestampedValue.of(KV.of("a", 0), now),
+              TimestampedValue.of(KV.of("bb", 0), nowP1),
+              TimestampedValue.of(KV.of("bb", 1), nowP1),
+              TimestampedValue.of(KV.of("ccccc", 0), nowP2),
+              TimestampedValue.of(KV.of("ccccc", 1), nowP2),
+              TimestampedValue.of(KV.of("ccccc", 2), nowP2),
+              TimestampedValue.of(KV.of("ccccc", 3), nowP2),
+              TimestampedValue.of(KV.of("ccccc", 4), nowP2));
+
+      List<TimestampedValue<KV<String, Integer>>> expected = new ArrayList<>();
+      for (TimestampedValue<KV<String, Integer>> tv : expectedUnfiltered) {
+        if (!window.start().isAfter(tv.getTimestamp())
+            && !tv.getTimestamp().isAfter(window.maxTimestamp())) {
+          expected.add(tv);
+        }
+      }
+      assertFalse(expected.isEmpty());
+
+      PAssert.that(timestamped).inWindow(window).containsInAnyOrder(expected);
+    }
+    p.run();
+  }
+
+  private static class SDFWithSideInputsAndOutputs extends DoFn<Integer, String> {
+    private final PCollectionView<String> sideInput;
+    private final TupleTag<String> sideOutput;
+
+    private SDFWithSideInputsAndOutputs(
+        PCollectionView<String> sideInput, TupleTag<String> sideOutput) {
+      this.sideInput = sideInput;
+      this.sideOutput = sideOutput;
+    }
+
+    @ProcessElement
+    public void process(ProcessContext c, OffsetRangeTracker tracker) {
+      checkState(tracker.tryClaim(tracker.currentRestriction().from));
+      String side = c.sideInput(sideInput);
+      c.output("main:" + side + ":" + c.element());
+      c.sideOutput(sideOutput, "side:" + side + ":" + c.element());
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(Integer value) {
+      return new OffsetRange(0, 1);
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(OffsetRange range) {
+      return new OffsetRangeTracker(range);
+    }
+  }
+
+  @Test
+  @Category({RunnableOnService.class, UsesSplittableParDo.class})
+  public void testSideInputsAndOutputs() throws Exception {
+    Pipeline p = TestPipeline.create();
+
+    PCollectionView<String> sideInput =
+        p.apply("side input", Create.of("foo")).apply(View.<String>asSingleton());
+    TupleTag<String> mainOutputTag = new TupleTag<>("main");
+    TupleTag<String> sideOutputTag = new TupleTag<>("side");
+
+    PCollectionTuple res =
+        p.apply("input", Create.of(0, 1, 2))
+            .apply(
+                ParDo.of(new SDFWithSideInputsAndOutputs(sideInput, sideOutputTag))
+                    .withSideInputs(sideInput)
+                    .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
+    res.get(mainOutputTag).setCoder(StringUtf8Coder.of());
+    res.get(sideOutputTag).setCoder(StringUtf8Coder.of());
+
+    PAssert.that(res.get(mainOutputTag))
+        .containsInAnyOrder(Arrays.asList("main:foo:0", "main:foo:1", "main:foo:2"));
+    PAssert.that(res.get(sideOutputTag))
+        .containsInAnyOrder(Arrays.asList("side:foo:0", "side:foo:1", "side:foo:2"));
+
+    p.run();
+  }
+
+  @Test
+  @Category({RunnableOnService.class, UsesSplittableParDo.class})
+  public void testLateData() throws Exception {
+    Pipeline p = TestPipeline.create();
+
+    Instant base = Instant.now();
+
+    TestStream<String> stream =
+        TestStream.create(StringUtf8Coder.of())
+            .advanceWatermarkTo(base)
+            .addElements("aa")
+            .advanceWatermarkTo(base.plus(Duration.standardSeconds(5)))
+            .addElements(TimestampedValue.of("bb", base.minus(Duration.standardHours(1))))
+            .advanceProcessingTime(Duration.standardHours(1))
+            .advanceWatermarkToInfinity();
+
+    PCollection<String> input =
+        p.apply(stream)
+            .apply(
+                Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
+                    .withAllowedLateness(Duration.standardMinutes(1)));
+
+    PCollection<KV<String, Integer>> afterSDF =
+        input
+            .apply(ParDo.of(new PairStringWithIndexToLength()))
+            .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
+
+    PCollection<String> nonLate =
+        afterSDF.apply(GroupByKey.<String, Integer>create()).apply(Keys.<String>create());
+
+    // The splittable DoFn itself should not drop any data and act as pass-through.
+    PAssert.that(afterSDF)
+        .containsInAnyOrder(
+            Arrays.asList(KV.of("aa", 0), KV.of("aa", 1), KV.of("bb", 0), KV.of("bb", 1)));
+
+    // But it should preserve the windowing strategy of the data, including allowed lateness:
+    // the follow-up GBK should drop the late data.
+    assertEquals(afterSDF.getWindowingStrategy(), input.getWindowingStrategy());
+    PAssert.that(nonLate).containsInAnyOrder("aa");
+
+    p.run();
+  }
+
+  private static class SDFWithLifecycle extends DoFn<String, String> {
+    private enum State {
+      BEFORE_SETUP,
+      OUTSIDE_BUNDLE,
+      INSIDE_BUNDLE,
+      TORN_DOWN
+    }
+
+    private State state = State.BEFORE_SETUP;
+
+    @ProcessElement
+    public void processElement(ProcessContext c, OffsetRangeTracker tracker) {
+      assertEquals(State.INSIDE_BUNDLE, state);
+      assertTrue(tracker.tryClaim(0));
+      c.output(c.element());
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(String value) {
+      return new OffsetRange(0, 1);
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(OffsetRange range) {
+      return new OffsetRangeTracker(range);
+    }
+
+    @Setup
+    public void setUp() {
+      assertEquals(State.BEFORE_SETUP, state);
+      state = State.OUTSIDE_BUNDLE;
+    }
+
+    @StartBundle
+    public void startBundle(Context c) {
+      assertEquals(State.OUTSIDE_BUNDLE, state);
+      state = State.INSIDE_BUNDLE;
+    }
+
+    @FinishBundle
+    public void finishBundle(Context c) {
+      assertEquals(State.INSIDE_BUNDLE, state);
+      state = State.OUTSIDE_BUNDLE;
+    }
+
+    @Teardown
+    public void tearDown() {
+      assertEquals(State.OUTSIDE_BUNDLE, state);
+      state = State.TORN_DOWN;
+    }
+  }
+
+  @Test
+  @Category({RunnableOnService.class, UsesSplittableParDo.class})
+  public void testLifecycleMethods() throws Exception {
+    Pipeline p = TestPipeline.create();
+
+    PCollection<String> res =
+        p.apply(Create.of("a", "b", "c")).apply(ParDo.of(new SDFWithLifecycle()));
+
+    PAssert.that(res).containsInAnyOrder("a", "b", "c");
+
+    p.run();
+  }
+
+  // TODO (https://issues.apache.org/jira/browse/BEAM-988): Test that Splittable DoFn
+  // emits output immediately (i.e. has a pass-through trigger) regardless of input's
+  // windowing/triggering strategy.
+}