You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 17:09:40 UTC

[13/28] beam git commit: Revert "[BEAM-2610] This closes #3553"

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 4b5d5f5..171171f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -45,19 +45,14 @@ import org.apache.beam.sdk.coders.CoderProviders;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.LargeKeys;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestStream;
-import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -189,40 +184,6 @@ public class GroupByKeyTest {
     p.run();
   }
 
-  /**
-   * Tests that when a processing time timers comes in after a window is expired it does not cause a
-   * spurious output.
-   */
-  @Test
-  @Category({ValidatesRunner.class, UsesTestStream.class})
-  public void testCombiningAccumulatingProcessingTime() throws Exception {
-    PCollection<Integer> triggeredSums =
-        p.apply(
-                TestStream.create(VarIntCoder.of())
-                    .advanceWatermarkTo(new Instant(0))
-                    .addElements(
-                        TimestampedValue.of(2, new Instant(2)),
-                        TimestampedValue.of(5, new Instant(5)))
-                    .advanceWatermarkTo(new Instant(100))
-                    .advanceProcessingTime(Duration.millis(10))
-                    .advanceWatermarkToInfinity())
-            .apply(
-                Window.<Integer>into(FixedWindows.of(Duration.millis(100)))
-                    .withTimestampCombiner(TimestampCombiner.EARLIEST)
-                    .accumulatingFiredPanes()
-                    .withAllowedLateness(Duration.ZERO)
-                    .triggering(
-                        Repeatedly.forever(
-                            AfterProcessingTime.pastFirstElementInPane()
-                                .plusDelayOf(Duration.millis(10)))))
-            .apply(Sum.integersGlobally().withoutDefaults());
-
-    PAssert.that(triggeredSums)
-        .containsInAnyOrder(7);
-
-    p.run();
-  }
-
   @Test
   public void testGroupByKeyNonDeterministic() throws Exception {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index fa4949e..c67cf2a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -62,8 +62,6 @@ import org.apache.beam.sdk.coders.SetCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.CombiningState;
 import org.apache.beam.sdk.state.MapState;
@@ -1593,108 +1591,6 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  public void testStateNotKeyed() {
-    final String stateId = "foo";
-
-    DoFn<String, Integer> fn =
-        new DoFn<String, Integer>() {
-
-          @StateId(stateId)
-          private final StateSpec<ValueState<Integer>> intState =
-              StateSpecs.value();
-
-          @ProcessElement
-          public void processElement(
-              ProcessContext c, @StateId(stateId) ValueState<Integer> state) {}
-        };
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("state");
-    thrown.expectMessage("KvCoder");
-
-    pipeline.apply(Create.of("hello", "goodbye", "hello again")).apply(ParDo.of(fn));
-  }
-
-  @Test
-  public void testStateNotDeterministic() {
-    final String stateId = "foo";
-
-    // DoubleCoder is not deterministic, so this should crash
-    DoFn<KV<Double, String>, Integer> fn =
-        new DoFn<KV<Double, String>, Integer>() {
-
-          @StateId(stateId)
-          private final StateSpec<ValueState<Integer>> intState =
-              StateSpecs.value();
-
-          @ProcessElement
-          public void processElement(
-              ProcessContext c, @StateId(stateId) ValueState<Integer> state) {}
-        };
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("state");
-    thrown.expectMessage("deterministic");
-
-    pipeline
-        .apply(Create.of(KV.of(1.0, "hello"), KV.of(5.4, "goodbye"), KV.of(7.2, "hello again")))
-        .apply(ParDo.of(fn));
-  }
-
-  @Test
-  public void testTimerNotKeyed() {
-    final String timerId = "foo";
-
-    DoFn<String, Integer> fn =
-        new DoFn<String, Integer>() {
-
-          @TimerId(timerId)
-          private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
-
-          @ProcessElement
-          public void processElement(
-              ProcessContext c, @TimerId(timerId) Timer timer) {}
-
-          @OnTimer(timerId)
-          public void onTimer() {}
-        };
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("timer");
-    thrown.expectMessage("KvCoder");
-
-    pipeline.apply(Create.of("hello", "goodbye", "hello again")).apply(ParDo.of(fn));
-  }
-
-  @Test
-  public void testTimerNotDeterministic() {
-    final String timerId = "foo";
-
-    // DoubleCoder is not deterministic, so this should crash
-    DoFn<KV<Double, String>, Integer> fn =
-        new DoFn<KV<Double, String>, Integer>() {
-
-          @TimerId(timerId)
-          private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
-
-          @ProcessElement
-          public void processElement(
-              ProcessContext c, @TimerId(timerId) Timer timer) {}
-
-          @OnTimer(timerId)
-          public void onTimer() {}
-        };
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("timer");
-    thrown.expectMessage("deterministic");
-
-    pipeline
-        .apply(Create.of(KV.of(1.0, "hello"), KV.of(5.4, "goodbye"), KV.of(7.2, "hello again")))
-        .apply(ParDo.of(fn));
-  }
-
-  @Test
   @Category({ValidatesRunner.class, UsesStatefulParDo.class})
   public void testValueStateCoderInference() {
     final String stateId = "foo";
@@ -3046,65 +2942,4 @@ public class ParDoTest implements Serializable {
 
     // If it doesn't crash, we made it!
   }
-
-  /** A {@link PipelineOptions} subclass for testing passing to a {@link DoFn}. */
-  public interface MyOptions extends PipelineOptions {
-    @Default.String("fake option")
-    String getFakeOption();
-    void setFakeOption(String value);
-  }
-
-  @Test
-  @Category(ValidatesRunner.class)
-  public void testPipelineOptionsParameter() {
-    PCollection<String> results = pipeline
-        .apply(Create.of(1))
-        .apply(
-            ParDo.of(
-                new DoFn<Integer, String>() {
-                  @ProcessElement
-                  public void process(ProcessContext c, PipelineOptions options) {
-                    c.output(options.as(MyOptions.class).getFakeOption());
-                  }
-                }));
-
-    String testOptionValue = "not fake anymore";
-    pipeline.getOptions().as(MyOptions.class).setFakeOption(testOptionValue);
-    PAssert.that(results).containsInAnyOrder("not fake anymore");
-
-    pipeline.run();
-  }
-
-  @Test
-  @Category({ValidatesRunner.class, UsesTimersInParDo.class})
-  public void testPipelineOptionsParameterOnTimer() {
-    final String timerId = "thisTimer";
-
-    PCollection<String> results =
-        pipeline
-            .apply(Create.of(KV.of(0, 0)))
-            .apply(
-                ParDo.of(
-                    new DoFn<KV<Integer, Integer>, String>() {
-                      @TimerId(timerId)
-                      private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
-
-                      @ProcessElement
-                      public void process(
-                          ProcessContext c, BoundedWindow w, @TimerId(timerId) Timer timer) {
-                        timer.set(w.maxTimestamp());
-                      }
-
-                      @OnTimer(timerId)
-                      public void onTimer(OnTimerContext c, PipelineOptions options) {
-                        c.output(options.as(MyOptions.class).getFakeOption());
-                      }
-                    }));
-
-    String testOptionValue = "not fake anymore";
-    pipeline.getOptions().as(MyOptions.class).setFakeOption(testOptionValue);
-    PAssert.that(results).containsInAnyOrder("not fake anymore");
-
-    pipeline.run();
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index d2d2529..02a44d2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -18,14 +18,10 @@
 package org.apache.beam.sdk.transforms;
 
 import static com.google.common.base.Preconditions.checkState;
-import static org.apache.beam.sdk.testing.TestPipeline.testingPipelineOptions;
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import com.google.common.collect.Ordering;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -33,9 +29,6 @@ import java.util.List;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.range.OffsetRange;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
@@ -44,6 +37,7 @@ import org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs;
 import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -73,16 +67,10 @@ public class SplittableDoFnTest implements Serializable {
 
   static class PairStringWithIndexToLength extends DoFn<String, KV<String, Integer>> {
     @ProcessElement
-    public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) {
-      for (long i = tracker.currentRestriction().getFrom(), numIterations = 0;
-          tracker.tryClaim(i);
-          ++i, ++numIterations) {
+    public void process(ProcessContext c, OffsetRangeTracker tracker) {
+      for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
         c.output(KV.of(c.element(), (int) i));
-        if (numIterations % 3 == 0) {
-          return resume();
-        }
       }
-      return stop();
     }
 
     @GetInitialRestriction
@@ -105,25 +93,8 @@ public class SplittableDoFnTest implements Serializable {
     }
   }
 
-  private static PipelineOptions streamingTestPipelineOptions() {
-    // Using testing options with streaming=true makes it possible to enable UsesSplittableParDo
-    // tests in Dataflow runner, because as of writing, it can run Splittable DoFn only in
-    // streaming mode.
-    // This is a no-op for other runners currently (Direct runner doesn't care, and other
-    // runners don't implement SDF at all yet).
-    //
-    // This is a workaround until https://issues.apache.org/jira/browse/BEAM-1620
-    // is properly implemented and supports marking tests as streaming-only.
-    //
-    // https://issues.apache.org/jira/browse/BEAM-2483 specifically tracks the removal of the
-    // current workaround.
-    PipelineOptions options = testingPipelineOptions();
-    options.as(StreamingOptions.class).setStreaming(true);
-    return options;
-  }
-
   @Rule
-  public final transient TestPipeline p = TestPipeline.fromOptions(streamingTestPipelineOptions());
+  public final transient TestPipeline p = TestPipeline.create();
 
   @Test
   @Category({ValidatesRunner.class, UsesSplittableParDo.class})
@@ -211,12 +182,6 @@ public class SplittableDoFnTest implements Serializable {
   private static class SDFWithMultipleOutputsPerBlock extends DoFn<String, Integer> {
     private static final int MAX_INDEX = 98765;
 
-    private final int numClaimsPerCall;
-
-    private SDFWithMultipleOutputsPerBlock(int numClaimsPerCall) {
-      this.numClaimsPerCall = numClaimsPerCall;
-    }
-
     private static int snapToNextBlock(int index, int[] blockStarts) {
       for (int i = 1; i < blockStarts.length; ++i) {
         if (index > blockStarts[i - 1] && index <= blockStarts[i]) {
@@ -227,20 +192,14 @@ public class SplittableDoFnTest implements Serializable {
     }
 
     @ProcessElement
-    public ProcessContinuation processElement(ProcessContext c, OffsetRangeTracker tracker) {
+    public void processElement(ProcessContext c, OffsetRangeTracker tracker) {
       int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
       int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts);
-      for (int i = trueStart, numIterations = 1;
-          tracker.tryClaim(blockStarts[i]);
-          ++i, ++numIterations) {
+      for (int i = trueStart; tracker.tryClaim(blockStarts[i]); ++i) {
         for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) {
           c.output(index);
         }
-        if (numIterations == numClaimsPerCall) {
-          return resume();
-        }
       }
-      return stop();
     }
 
     @GetInitialRestriction
@@ -253,7 +212,7 @@ public class SplittableDoFnTest implements Serializable {
   @Category({ValidatesRunner.class, UsesSplittableParDo.class})
   public void testOutputAfterCheckpoint() throws Exception {
     PCollection<Integer> outputs = p.apply(Create.of("foo"))
-        .apply(ParDo.of(new SDFWithMultipleOutputsPerBlock(3)));
+        .apply(ParDo.of(new SDFWithMultipleOutputsPerBlock()));
     PAssert.thatSingleton(outputs.apply(Count.<Integer>globally()))
         .isEqualTo((long) SDFWithMultipleOutputsPerBlock.MAX_INDEX);
     p.run();
@@ -328,105 +287,9 @@ public class SplittableDoFnTest implements Serializable {
     PAssert.that(res).containsInAnyOrder("a:0", "a:1", "a:2", "a:3", "b:4", "b:5", "b:6", "b:7");
 
     p.run();
-  }
-
-  @BoundedPerElement
-  private static class SDFWithMultipleOutputsPerBlockAndSideInput
-      extends DoFn<Integer, KV<String, Integer>> {
-    private static final int MAX_INDEX = 98765;
-    private final PCollectionView<String> sideInput;
-    private final int numClaimsPerCall;
-
-    public SDFWithMultipleOutputsPerBlockAndSideInput(
-        PCollectionView<String> sideInput, int numClaimsPerCall) {
-      this.sideInput = sideInput;
-      this.numClaimsPerCall = numClaimsPerCall;
-    }
-
-    private static int snapToNextBlock(int index, int[] blockStarts) {
-      for (int i = 1; i < blockStarts.length; ++i) {
-        if (index > blockStarts[i - 1] && index <= blockStarts[i]) {
-          return i;
-        }
-      }
-      throw new IllegalStateException("Shouldn't get here");
-    }
-
-    @ProcessElement
-    public ProcessContinuation processElement(ProcessContext c, OffsetRangeTracker tracker) {
-      int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
-      int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts);
-      for (int i = trueStart, numIterations = 1;
-          tracker.tryClaim(blockStarts[i]);
-          ++i, ++numIterations) {
-        for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) {
-          c.output(KV.of(c.sideInput(sideInput) + ":" + c.element(), index));
-        }
-        if (numIterations == numClaimsPerCall) {
-          return resume();
-        }
-      }
-      return stop();
-    }
-
-    @GetInitialRestriction
-    public OffsetRange getInitialRange(Integer element) {
-      return new OffsetRange(0, MAX_INDEX);
-    }
-  }
-
-  @Test
-  @Category({
-    ValidatesRunner.class,
-    UsesSplittableParDo.class,
-    UsesSplittableParDoWithWindowedSideInputs.class
-  })
-  public void testWindowedSideInputWithCheckpoints() throws Exception {
-    PCollection<Integer> mainInput =
-        p.apply("main",
-                Create.timestamped(
-                    TimestampedValue.of(0, new Instant(0)),
-                    TimestampedValue.of(1, new Instant(1)),
-                    TimestampedValue.of(2, new Instant(2)),
-                    TimestampedValue.of(3, new Instant(3))))
-            .apply("window 1", Window.<Integer>into(FixedWindows.of(Duration.millis(1))));
-
-    PCollectionView<String> sideInput =
-        p.apply("side",
-                Create.timestamped(
-                    TimestampedValue.of("a", new Instant(0)),
-                    TimestampedValue.of("b", new Instant(2))))
-            .apply("window 2", Window.<String>into(FixedWindows.of(Duration.millis(2))))
-            .apply("singleton", View.<String>asSingleton());
-
-    PCollection<KV<String, Integer>> res =
-        mainInput.apply(
-            ParDo.of(
-                    new SDFWithMultipleOutputsPerBlockAndSideInput(
-                        sideInput, 3 /* numClaimsPerCall */))
-                .withSideInputs(sideInput));
-    PCollection<KV<String, Iterable<Integer>>> grouped =
-        res.apply(GroupByKey.<String, Integer>create());
-
-    PAssert.that(grouped.apply(Keys.<String>create()))
-        .containsInAnyOrder("a:0", "a:1", "b:2", "b:3");
-    PAssert.that(grouped)
-        .satisfies(
-            new SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, Void>() {
-              @Override
-              public Void apply(Iterable<KV<String, Iterable<Integer>>> input) {
-                List<Integer> expected = new ArrayList<>();
-                for (int i = 0; i < SDFWithMultipleOutputsPerBlockAndSideInput.MAX_INDEX; ++i) {
-                  expected.add(i);
-                }
-                for (KV<String, Iterable<Integer>> kv : input) {
-                  assertEquals(expected, Ordering.<Integer>natural().sortedCopy(kv.getValue()));
-                }
-                return null;
-              }
-            });
-    p.run();
 
+    // TODO: also add test coverage when the SDF checkpoints - the resumed call should also
+    // properly access side inputs.
     // TODO: also test coverage when some of the windows of the side input are not ready.
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 2098c66..3edb194 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.transforms.reflect;
 
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
@@ -91,8 +89,8 @@ public class DoFnInvokersTest {
     when(mockArgumentProvider.processContext(Matchers.<DoFn>any())).thenReturn(mockProcessContext);
   }
 
-  private DoFn.ProcessContinuation invokeProcessElement(DoFn<String, String> fn) {
-    return DoFnInvokers.invokerFor(fn).invokeProcessElement(mockArgumentProvider);
+  private void invokeProcessElement(DoFn<String, String> fn) {
+    DoFnInvokers.invokerFor(fn).invokeProcessElement(mockArgumentProvider);
   }
 
   private void invokeOnTimer(String timerId, DoFn<String, String> fn) {
@@ -121,7 +119,7 @@ public class DoFnInvokersTest {
       public void processElement(ProcessContext c) throws Exception {}
     }
     MockFn mockFn = mock(MockFn.class);
-    assertEquals(stop(), invokeProcessElement(mockFn));
+    invokeProcessElement(mockFn);
     verify(mockFn).processElement(mockProcessContext);
   }
 
@@ -142,7 +140,7 @@ public class DoFnInvokersTest {
   public void testDoFnWithProcessElementInterface() throws Exception {
     IdentityUsingInterfaceWithProcessElement fn =
         mock(IdentityUsingInterfaceWithProcessElement.class);
-    assertEquals(stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     verify(fn).processElement(mockProcessContext);
   }
 
@@ -163,14 +161,14 @@ public class DoFnInvokersTest {
   @Test
   public void testDoFnWithMethodInSuperclass() throws Exception {
     IdentityChildWithoutOverride fn = mock(IdentityChildWithoutOverride.class);
-    assertEquals(stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     verify(fn).process(mockProcessContext);
   }
 
   @Test
   public void testDoFnWithMethodInSubclass() throws Exception {
     IdentityChildWithOverride fn = mock(IdentityChildWithOverride.class);
-    assertEquals(stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     verify(fn).process(mockProcessContext);
   }
 
@@ -181,7 +179,7 @@ public class DoFnInvokersTest {
       public void processElement(ProcessContext c, IntervalWindow w) throws Exception {}
     }
     MockFn fn = mock(MockFn.class);
-    assertEquals(stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     verify(fn).processElement(mockProcessContext, mockWindow);
   }
 
@@ -205,7 +203,7 @@ public class DoFnInvokersTest {
           throws Exception {}
     }
     MockFn fn = mock(MockFn.class);
-    assertEquals(stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     verify(fn).processElement(mockProcessContext, mockState);
   }
 
@@ -231,35 +229,11 @@ public class DoFnInvokersTest {
       public void onTimer() {}
     }
     MockFn fn = mock(MockFn.class);
-    assertEquals(stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     verify(fn).processElement(mockProcessContext, mockTimer);
   }
 
   @Test
-  public void testDoFnWithReturn() throws Exception {
-    class MockFn extends DoFn<String, String> {
-      @DoFn.ProcessElement
-      public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker)
-          throws Exception {
-        return null;
-      }
-
-      @GetInitialRestriction
-      public SomeRestriction getInitialRestriction(String element) {
-        return null;
-      }
-
-      @NewTracker
-      public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
-        return null;
-      }
-    }
-    MockFn fn = mock(MockFn.class);
-    when(fn.processElement(mockProcessContext, null)).thenReturn(resume());
-    assertEquals(resume(), invokeProcessElement(fn));
-  }
-
-  @Test
   public void testDoFnWithStartBundleSetupTeardown() throws Exception {
     class MockFn extends DoFn<String, String> {
       @ProcessElement
@@ -314,9 +288,7 @@ public class DoFnInvokersTest {
   /** Public so Mockito can do "delegatesTo()" in the test below. */
   public static class MockFn extends DoFn<String, String> {
     @ProcessElement
-    public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker) {
-      return null;
-    }
+    public void processElement(ProcessContext c, SomeRestrictionTracker tracker) {}
 
     @GetInitialRestriction
     public SomeRestriction getInitialRestriction(String element) {
@@ -368,7 +340,7 @@ public class DoFnInvokersTest {
         .splitRestriction(
             eq("blah"), same(restriction), Mockito.<DoFn.OutputReceiver<SomeRestriction>>any());
     when(fn.newTracker(restriction)).thenReturn(tracker);
-    when(fn.processElement(mockProcessContext, tracker)).thenReturn(resume());
+    fn.processElement(mockProcessContext, tracker);
 
     assertEquals(coder, invoker.invokeGetRestrictionCoder(CoderRegistry.createDefault()));
     assertEquals(restriction, invoker.invokeGetInitialRestriction("blah"));
@@ -384,8 +356,6 @@ public class DoFnInvokersTest {
         });
     assertEquals(Arrays.asList(part1, part2, part3), outputs);
     assertEquals(tracker, invoker.invokeNewTracker(restriction));
-    assertEquals(
-        resume(),
         invoker.invokeProcessElement(
             new FakeArgumentProvider<String, String>() {
               @Override
@@ -397,7 +367,7 @@ public class DoFnInvokersTest {
               public RestrictionTracker<?> restrictionTracker() {
                 return tracker;
               }
-            }));
+            });
   }
 
   private static class RestrictionWithDefaultTracker
@@ -471,7 +441,7 @@ public class DoFnInvokersTest {
             assertEquals("foo", output);
           }
         });
-    assertEquals(stop(), invoker.invokeProcessElement(mockArgumentProvider));
+    invoker.invokeProcessElement(mockArgumentProvider);
     assertThat(
         invoker.invokeNewTracker(new RestrictionWithDefaultTracker()),
         instanceOf(DefaultTracker.class));
@@ -561,14 +531,14 @@ public class DoFnInvokersTest {
   @Test
   public void testLocalPrivateDoFnClass() throws Exception {
     PrivateDoFnClass fn = mock(PrivateDoFnClass.class);
-    assertEquals(stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     verify(fn).processThis(mockProcessContext);
   }
 
   @Test
   public void testStaticPackagePrivateDoFnClass() throws Exception {
     DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPackagePrivateDoFn().getClass());
-    assertEquals(stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     DoFnInvokersTestHelper.verifyStaticPackagePrivateDoFn(fn, mockProcessContext);
   }
 
@@ -576,28 +546,28 @@ public class DoFnInvokersTest {
   public void testInnerPackagePrivateDoFnClass() throws Exception {
     DoFn<String, String> fn =
         mock(new DoFnInvokersTestHelper().newInnerPackagePrivateDoFn().getClass());
-    assertEquals(stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     DoFnInvokersTestHelper.verifyInnerPackagePrivateDoFn(fn, mockProcessContext);
   }
 
   @Test
   public void testStaticPrivateDoFnClass() throws Exception {
     DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPrivateDoFn().getClass());
-    assertEquals(stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     DoFnInvokersTestHelper.verifyStaticPrivateDoFn(fn, mockProcessContext);
   }
 
   @Test
   public void testInnerPrivateDoFnClass() throws Exception {
     DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerPrivateDoFn().getClass());
-    assertEquals(stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     DoFnInvokersTestHelper.verifyInnerPrivateDoFn(fn, mockProcessContext);
   }
 
   @Test
   public void testAnonymousInnerDoFn() throws Exception {
     DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerAnonymousDoFn().getClass());
-    assertEquals(stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     DoFnInvokersTestHelper.verifyInnerAnonymousDoFn(fn, mockProcessContext);
   }
 
@@ -634,31 +604,6 @@ public class DoFnInvokersTest {
   }
 
   @Test
-  public void testProcessElementExceptionWithReturn() throws Exception {
-    thrown.expect(UserCodeException.class);
-    thrown.expectMessage("bogus");
-    DoFnInvokers.invokerFor(
-            new DoFn<Integer, Integer>() {
-              @ProcessElement
-              public ProcessContinuation processElement(
-                  @SuppressWarnings("unused") ProcessContext c, SomeRestrictionTracker tracker) {
-                throw new IllegalArgumentException("bogus");
-              }
-
-              @GetInitialRestriction
-              public SomeRestriction getInitialRestriction(Integer element) {
-                return null;
-              }
-
-              @NewTracker
-              public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
-                return null;
-              }
-            })
-        .invokeProcessElement(new FakeArgumentProvider<Integer, Integer>());
-  }
-
-  @Test
   public void testStartBundleException() throws Exception {
     DoFnInvoker<Integer, Integer> invoker =
         DoFnInvokers.invokerFor(

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
index 44ae5c4..d321f54 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
@@ -50,7 +50,7 @@ public class DoFnSignaturesProcessElementTest {
   @Test
   public void testBadReturnType() throws Exception {
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Must return void or ProcessContinuation");
+    thrown.expectMessage("Must return void");
 
     analyzeProcessElementMethod(
         new AnonymousMethod() {

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index 08af65e..07b3348 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -52,8 +52,7 @@ import org.junit.runners.JUnit4;
 public class DoFnSignaturesSplittableDoFnTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
 
-  private abstract static class SomeRestriction
-      implements HasDefaultTracker<SomeRestriction, SomeRestrictionTracker> {}
+  private static class SomeRestriction {}
 
   private abstract static class SomeRestrictionTracker
       implements RestrictionTracker<SomeRestriction> {}
@@ -61,20 +60,6 @@ public class DoFnSignaturesSplittableDoFnTest {
   private abstract static class SomeRestrictionCoder extends StructuredCoder<SomeRestriction> {}
 
   @Test
-  public void testReturnsProcessContinuation() throws Exception {
-    DoFnSignature.ProcessElementMethod signature =
-        analyzeProcessElementMethod(
-            new AnonymousMethod() {
-              private DoFn.ProcessContinuation method(
-                  DoFn<Integer, String>.ProcessContext context) {
-                return null;
-              }
-            });
-
-    assertTrue(signature.hasReturnValue());
-  }
-
-  @Test
   public void testHasRestrictionTracker() throws Exception {
     DoFnSignature.ProcessElementMethod signature =
         analyzeProcessElementMethod(
@@ -115,6 +100,11 @@ public class DoFnSignaturesSplittableDoFnTest {
       public SomeRestriction getInitialRestriction(Integer element) {
         return null;
       }
+
+      @NewTracker
+      public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+        return null;
+      }
     }
 
     @BoundedPerElement
@@ -140,55 +130,6 @@ public class DoFnSignaturesSplittableDoFnTest {
             .isBoundedPerElement());
   }
 
-  private static class BaseFnWithoutContinuation extends DoFn<Integer, String> {
-    @ProcessElement
-    public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {}
-
-    @GetInitialRestriction
-    public SomeRestriction getInitialRestriction(Integer element) {
-      return null;
-    }
-  }
-
-  private static class BaseFnWithContinuation extends DoFn<Integer, String> {
-    @ProcessElement
-    public ProcessContinuation processElement(
-        ProcessContext context, SomeRestrictionTracker tracker) {
-      return null;
-    }
-
-    @GetInitialRestriction
-    public SomeRestriction getInitialRestriction(Integer element) {
-      return null;
-    }
-  }
-
-  @Test
-  public void testSplittableBoundednessInferredFromReturnValue() throws Exception {
-    assertEquals(
-        PCollection.IsBounded.BOUNDED,
-        DoFnSignatures.getSignature(BaseFnWithoutContinuation.class).isBoundedPerElement());
-    assertEquals(
-        PCollection.IsBounded.UNBOUNDED,
-        DoFnSignatures.getSignature(BaseFnWithContinuation.class).isBoundedPerElement());
-  }
-
-  @Test
-  public void testSplittableRespectsBoundednessAnnotation() throws Exception {
-    @BoundedPerElement
-    class BoundedFnWithContinuation extends BaseFnWithContinuation {}
-
-    assertEquals(
-        PCollection.IsBounded.BOUNDED,
-        DoFnSignatures.getSignature(BoundedFnWithContinuation.class).isBoundedPerElement());
-
-    @UnboundedPerElement
-    class UnboundedFnWithContinuation extends BaseFnWithContinuation {}
-
-    assertEquals(
-        PCollection.IsBounded.UNBOUNDED,
-        DoFnSignatures.getSignature(UnboundedFnWithContinuation.class).isBoundedPerElement());
-  }
   @Test
   public void testUnsplittableIsBounded() throws Exception {
     class UnsplittableFn extends DoFn<Integer, String> {
@@ -231,10 +172,8 @@ public class DoFnSignaturesSplittableDoFnTest {
   public void testSplittableWithAllFunctions() throws Exception {
     class GoodSplittableDoFn extends DoFn<Integer, String> {
       @ProcessElement
-      public ProcessContinuation processElement(
-          ProcessContext context, SomeRestrictionTracker tracker) {
-        return null;
-      }
+      public void processElement(
+          ProcessContext context, SomeRestrictionTracker tracker) {}
 
       @GetInitialRestriction
       public SomeRestriction getInitialRestriction(Integer element) {
@@ -259,7 +198,6 @@ public class DoFnSignaturesSplittableDoFnTest {
     DoFnSignature signature = DoFnSignatures.getSignature(GoodSplittableDoFn.class);
     assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType());
     assertTrue(signature.processElement().isSplittable());
-    assertTrue(signature.processElement().hasReturnValue());
     assertEquals(
         SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType());
     assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType());
@@ -276,9 +214,7 @@ public class DoFnSignaturesSplittableDoFnTest {
   public void testSplittableWithAllFunctionsGeneric() throws Exception {
     class GoodGenericSplittableDoFn<RestrictionT, TrackerT, CoderT> extends DoFn<Integer, String> {
       @ProcessElement
-      public ProcessContinuation processElement(ProcessContext context, TrackerT tracker) {
-        return null;
-      }
+      public void processElement(ProcessContext context, TrackerT tracker) {}
 
       @GetInitialRestriction
       public RestrictionT getInitialRestriction(Integer element) {
@@ -306,7 +242,6 @@ public class DoFnSignaturesSplittableDoFnTest {
                 SomeRestriction, SomeRestrictionTracker, SomeRestrictionCoder>() {}.getClass());
     assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType());
     assertTrue(signature.processElement().isSplittable());
-    assertTrue(signature.processElement().hasReturnValue());
     assertEquals(
         SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType());
     assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType());

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index 70c8dfd..cffb0ad 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -29,7 +29,6 @@ import static org.junit.Assert.fail;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
 import org.apache.beam.sdk.state.TimeDomain;
@@ -330,19 +329,6 @@ public class DoFnSignaturesTest {
   }
 
   @Test
-  public void testPipelineOptionsParameter() throws Exception {
-    DoFnSignature sig =
-        DoFnSignatures.getSignature(new DoFn<String, String>() {
-          @ProcessElement
-          public void process(ProcessContext c, PipelineOptions options) {}
-        }.getClass());
-
-    assertThat(
-        sig.processElement().extraParameters(),
-        Matchers.<Parameter>hasItem(instanceOf(Parameter.PipelineOptionsParameter.class)));
-  }
-
-  @Test
   public void testDeclAndUsageOfTimerInSuperclass() throws Exception {
     DoFnSignature sig =
         DoFnSignatures.getSignature(new DoFnOverridingAbstractTimerUse().getClass());

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
index 8aed6b9..831894c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.beam.sdk.io.range.OffsetRange;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java
index bfd01f0..b14e221 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java
@@ -21,7 +21,6 @@ import static org.apache.beam.sdk.testing.WindowFnTestUtils.runWindowFn;
 import static org.apache.beam.sdk.testing.WindowFnTestUtils.set;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -56,12 +55,11 @@ public class SlidingWindowsTest {
     expected.put(new IntervalWindow(new Instant(0), new Instant(10)), set(1, 2, 5, 9));
     expected.put(new IntervalWindow(new Instant(5), new Instant(15)), set(5, 9, 10, 11));
     expected.put(new IntervalWindow(new Instant(10), new Instant(20)), set(10, 11));
-    SlidingWindows windowFn = SlidingWindows.of(new Duration(10)).every(new Duration(5));
     assertEquals(
         expected,
-        runWindowFn(windowFn,
+        runWindowFn(
+            SlidingWindows.of(new Duration(10)).every(new Duration(5)),
             Arrays.asList(1L, 2L, 5L, 9L, 10L, 11L)));
-    assertThat(windowFn.assignsToOneWindow(), is(false));
   }
 
   @Test
@@ -71,27 +69,11 @@ public class SlidingWindowsTest {
     expected.put(new IntervalWindow(new Instant(0), new Instant(7)), set(1, 2, 5));
     expected.put(new IntervalWindow(new Instant(5), new Instant(12)), set(5, 9, 10, 11));
     expected.put(new IntervalWindow(new Instant(10), new Instant(17)), set(10, 11));
-    SlidingWindows windowFn = SlidingWindows.of(new Duration(7)).every(new Duration(5));
-    assertEquals(
-        expected,
-        runWindowFn(windowFn,
-            Arrays.asList(1L, 2L, 5L, 9L, 10L, 11L)));
-    assertThat(windowFn.assignsToOneWindow(), is(false));
-  }
-
-  @Test
-  public void testEqualSize() throws Exception {
-    Map<IntervalWindow, Set<String>> expected = new HashMap<>();
-    expected.put(new IntervalWindow(new Instant(0), new Instant(3)), set(1, 2));
-    expected.put(new IntervalWindow(new Instant(3), new Instant(6)), set(3, 4, 5));
-    expected.put(new IntervalWindow(new Instant(6), new Instant(9)), set(6, 7));
-    SlidingWindows windowFn = SlidingWindows.of(new Duration(3)).every(new Duration(3));
     assertEquals(
         expected,
         runWindowFn(
-            windowFn,
-            Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L)));
-    assertThat(windowFn.assignsToOneWindow(), is(true));
+            SlidingWindows.of(new Duration(7)).every(new Duration(5)),
+            Arrays.asList(1L, 2L, 5L, 9L, 10L, 11L)));
   }
 
   @Test
@@ -100,14 +82,12 @@ public class SlidingWindowsTest {
     expected.put(new IntervalWindow(new Instant(0), new Instant(3)), set(1, 2));
     expected.put(new IntervalWindow(new Instant(10), new Instant(13)), set(10, 11));
     expected.put(new IntervalWindow(new Instant(100), new Instant(103)), set(100));
-    SlidingWindows windowFn = SlidingWindows.of(new Duration(3)).every(new Duration(10));
     assertEquals(
         expected,
         runWindowFn(
             // Only look at the first 3 millisecs of every 10-millisec interval.
-            windowFn,
+            SlidingWindows.of(new Duration(3)).every(new Duration(10)),
             Arrays.asList(1L, 2L, 3L, 5L, 9L, 10L, 11L, 100L)));
-    assertThat(windowFn.assignsToOneWindow(), is(true));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/google-cloud-platform-core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/pom.xml b/sdks/java/extensions/google-cloud-platform-core/pom.xml
index 7d54990..e4e951b 100644
--- a/sdks/java/extensions/google-cloud-platform-core/pom.xml
+++ b/sdks/java/extensions/google-cloud-platform-core/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-extensions-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index d7205bf..8d1fe74 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -135,7 +135,7 @@ public class GcsUtil {
   private static final int MAX_CONCURRENT_BATCHES = 256;
 
   private static final FluentBackoff BACKOFF_FACTORY =
-      FluentBackoff.DEFAULT.withMaxRetries(10).withInitialBackoff(Duration.standardSeconds(1));
+      FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200));
 
   /////////////////////////////////////////////////////////////////////////////
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java
index fd908cf..e5b48d3 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java
@@ -17,9 +17,8 @@
  */
 package org.apache.beam.sdk.util;
 
-import static com.google.api.client.util.BackOffUtils.next;
-
-import com.google.api.client.http.HttpIOExceptionHandler;
+import com.google.api.client.http.HttpBackOffIOExceptionHandler;
+import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
 import com.google.api.client.http.HttpRequest;
 import com.google.api.client.http.HttpRequestInitializer;
 import com.google.api.client.http.HttpResponse;
@@ -61,106 +60,64 @@ public class RetryHttpRequestInitializer implements HttpRequestInitializer {
    */
   private static final int HANGING_GET_TIMEOUT_SEC = 80;
 
-  /** Handlers used to provide additional logging information on unsuccessful HTTP requests. */
-  private static class LoggingHttpBackOffHandler
-      implements HttpIOExceptionHandler, HttpUnsuccessfulResponseHandler {
-
-    private final Sleeper sleeper;
-    private final BackOff ioExceptionBackOff;
-    private final BackOff unsuccessfulResponseBackOff;
-    private final Set<Integer> ignoredResponseCodes;
-    private int ioExceptionRetries;
-    private int unsuccessfulResponseRetries;
-
-    private LoggingHttpBackOffHandler(
-        Sleeper sleeper,
-        BackOff ioExceptionBackOff,
-        BackOff unsucessfulResponseBackOff,
-        Set<Integer> ignoredResponseCodes) {
-      this.sleeper = sleeper;
-      this.ioExceptionBackOff = ioExceptionBackOff;
-      this.unsuccessfulResponseBackOff = unsucessfulResponseBackOff;
-      this.ignoredResponseCodes = ignoredResponseCodes;
+  private static class LoggingHttpBackOffIOExceptionHandler
+      extends HttpBackOffIOExceptionHandler {
+    public LoggingHttpBackOffIOExceptionHandler(BackOff backOff) {
+      super(backOff);
     }
 
     @Override
     public boolean handleIOException(HttpRequest request, boolean supportsRetry)
         throws IOException {
-      // We will retry if the request supports retry or the backoff was successful.
-      // Note that the order of these checks is important since
-      // backOffWasSuccessful will perform a sleep.
-      boolean willRetry = supportsRetry && backOffWasSuccessful(ioExceptionBackOff);
+      boolean willRetry = super.handleIOException(request, supportsRetry);
       if (willRetry) {
-        ioExceptionRetries += 1;
         LOG.debug("Request failed with IOException, will retry: {}", request.getUrl());
       } else {
-        String message = "Request failed with IOException, "
-            + "performed {} retries due to IOExceptions, "
-            + "performed {} retries due to unsuccessful status codes, "
-            + "HTTP framework says request {} be retried, "
-            + "(caller responsible for retrying): {}";
-        LOG.warn(message,
-            ioExceptionRetries,
-            unsuccessfulResponseRetries,
-            supportsRetry ? "can" : "cannot",
+        LOG.warn(
+            "Request failed with IOException (caller responsible for retrying): {}",
             request.getUrl());
       }
       return willRetry;
     }
+  }
+
+  private static class LoggingHttpBackoffUnsuccessfulResponseHandler
+      implements HttpUnsuccessfulResponseHandler {
+    private final HttpBackOffUnsuccessfulResponseHandler handler;
+    private final Set<Integer> ignoredResponseCodes;
+
+    public LoggingHttpBackoffUnsuccessfulResponseHandler(BackOff backoff,
+        Sleeper sleeper, Set<Integer> ignoredResponseCodes) {
+      this.ignoredResponseCodes = ignoredResponseCodes;
+      handler = new HttpBackOffUnsuccessfulResponseHandler(backoff);
+      handler.setSleeper(sleeper);
+      handler.setBackOffRequired(
+          new HttpBackOffUnsuccessfulResponseHandler.BackOffRequired() {
+            @Override
+            public boolean isRequired(HttpResponse response) {
+              int statusCode = response.getStatusCode();
+              return (statusCode / 100 == 5) ||  // 5xx: server error
+                  statusCode == 429;             // 429: Too many requests
+            }
+          });
+    }
 
     @Override
-    public boolean handleResponse(HttpRequest request, HttpResponse response, boolean supportsRetry)
-        throws IOException {
-      // We will retry if the request supports retry and the status code requires a backoff
-      // and the backoff was successful. Note that the order of these checks is important since
-      // backOffWasSuccessful will perform a sleep.
-      boolean willRetry = supportsRetry
-          && retryOnStatusCode(response.getStatusCode())
-          && backOffWasSuccessful(unsuccessfulResponseBackOff);
-      if (willRetry) {
-        unsuccessfulResponseRetries += 1;
+    public boolean handleResponse(HttpRequest request, HttpResponse response,
+        boolean supportsRetry) throws IOException {
+      boolean retry = handler.handleResponse(request, response, supportsRetry);
+      if (retry) {
         LOG.debug("Request failed with code {}, will retry: {}",
             response.getStatusCode(), request.getUrl());
-      } else {
-        String message = "Request failed with code {}, "
-            + "performed {} retries due to IOExceptions, "
-            + "performed {} retries due to unsuccessful status codes, "
-            + "HTTP framework says request {} be retried, "
-            + "(caller responsible for retrying): {}";
-        if (ignoredResponseCodes.contains(response.getStatusCode())) {
-          // Log ignored response codes at a lower level
-          LOG.debug(message,
-              response.getStatusCode(),
-              ioExceptionRetries,
-              unsuccessfulResponseRetries,
-              supportsRetry ? "can" : "cannot",
-              request.getUrl());
-        } else {
-          LOG.warn(message,
-              response.getStatusCode(),
-              ioExceptionRetries,
-              unsuccessfulResponseRetries,
-              supportsRetry ? "can" : "cannot",
-              request.getUrl());
-        }
-      }
-      return willRetry;
-    }
 
-    /** Returns true iff performing the backoff was successful. */
-    private boolean backOffWasSuccessful(BackOff backOff) {
-      try {
-        return next(sleeper, backOff);
-      } catch (InterruptedException | IOException e) {
-        return false;
+      } else if (!ignoredResponseCodes.contains(response.getStatusCode())) {
+        LOG.warn(
+            "Request failed with code {} (caller responsible for retrying): {}",
+            response.getStatusCode(),
+            request.getUrl());
       }
-    }
 
-    /** Returns true iff the {@code statusCode} represents an error that should be retried. */
-    private boolean retryOnStatusCode(int statusCode) {
-      return (statusCode == 0) // Code 0 usually means no response / network error
-          || (statusCode / 100 == 5) // 5xx: server error
-          || statusCode == 429; // 429: Too many requests
+      return retry;
     }
   }
 
@@ -216,20 +173,20 @@ public class RetryHttpRequestInitializer implements HttpRequestInitializer {
     // TODO: Do this exclusively for work requests.
     request.setReadTimeout(HANGING_GET_TIMEOUT_SEC * 1000);
 
-    LoggingHttpBackOffHandler loggingHttpBackOffHandler = new LoggingHttpBackOffHandler(
-        sleeper,
-        // Retry immediately on IOExceptions.
-        BackOff.ZERO_BACKOFF,
-        // Back off on retryable http errors.
+    // Back off on retryable http errors.
+    request.setUnsuccessfulResponseHandler(
         // A back-off multiplier of 2 raises the maximum request retrying time
         // to approximately 5 minutes (keeping other back-off parameters to
         // their default values).
-        new ExponentialBackOff.Builder().setNanoClock(nanoClock).setMultiplier(2).build(),
-        ignoredResponseCodes
-    );
-
-    request.setUnsuccessfulResponseHandler(loggingHttpBackOffHandler);
-    request.setIOExceptionHandler(loggingHttpBackOffHandler);
+        new LoggingHttpBackoffUnsuccessfulResponseHandler(
+            new ExponentialBackOff.Builder().setNanoClock(nanoClock)
+                                            .setMultiplier(2).build(),
+            sleeper, ignoredResponseCodes));
+
+    // Retry immediately on IOExceptions.
+    LoggingHttpBackOffIOExceptionHandler loggingBackoffHandler =
+        new LoggingHttpBackOffIOExceptionHandler(BackOff.ZERO_BACKOFF);
+    request.setIOExceptionHandler(loggingBackoffHandler);
 
     // Set response initializer
     if (responseInterceptor != null) {

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
index a0d9e4b..625c248 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
@@ -15,16 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.gcp;
+package org.apache.beam;
 
-import static org.apache.beam.sdk.util.ApiSurface.classesInPackage;
-import static org.apache.beam.sdk.util.ApiSurface.containsOnlyClassesMatching;
+import static org.apache.beam.sdk.util.ApiSurface.containsOnlyPackages;
 import static org.hamcrest.MatcherAssert.assertThat;
 
 import com.google.common.collect.ImmutableSet;
 import java.util.Set;
 import org.apache.beam.sdk.util.ApiSurface;
-import org.hamcrest.Matcher;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -34,32 +32,28 @@ import org.junit.runners.JUnit4;
 public class GcpCoreApiSurfaceTest {
 
   @Test
-  public void testGcpCoreApiSurface() throws Exception {
-    final Package thisPackage = getClass().getPackage();
-    final ClassLoader thisClassLoader = getClass().getClassLoader();
-    final ApiSurface apiSurface =
-        ApiSurface.ofPackage(thisPackage, thisClassLoader)
-            .pruningPattern("org[.]apache[.]beam[.].*Test.*")
-            .pruningPattern("org[.]apache[.]beam[.].*IT")
-            .pruningPattern("java[.]lang.*")
-            .pruningPattern("java[.]util.*");
+  public void testApiSurface() throws Exception {
 
     @SuppressWarnings("unchecked")
-    final Set<Matcher<Class<?>>> allowedClasses =
+    final Set<String> allowed =
         ImmutableSet.of(
-            classesInPackage("com.google.api.client.googleapis"),
-            classesInPackage("com.google.api.client.http"),
-            classesInPackage("com.google.api.client.json"),
-            classesInPackage("com.google.api.client.util"),
-            classesInPackage("com.google.api.services.storage"),
-            classesInPackage("com.google.auth"),
-            classesInPackage("com.fasterxml.jackson.annotation"),
-            classesInPackage("java"),
-            classesInPackage("javax"),
-            classesInPackage("org.apache.beam.sdk"),
-            classesInPackage("org.joda.time")
-        );
+            "org.apache.beam",
+            "com.google.api.client",
+            "com.google.api.services.storage",
+            "com.google.auth",
+            "com.fasterxml.jackson.annotation",
+            "com.fasterxml.jackson.core",
+            "com.fasterxml.jackson.databind",
+            "org.apache.avro",
+            "org.hamcrest",
+            // via DataflowMatchers
+            "org.codehaus.jackson",
+            // via Avro
+            "org.joda.time",
+            "org.junit",
+            "sun.reflect");
 
-    assertThat(apiSurface, containsOnlyClassesMatching(allowedClasses));
+    assertThat(
+        ApiSurface.getSdkApiSurface(getClass().getClassLoader()), containsOnlyPackages(allowed));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
index 13a9309..37551a4 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
@@ -49,11 +49,10 @@ import java.net.SocketTimeoutException;
 import java.security.PrivateKey;
 import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.hamcrest.Matchers;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -68,8 +67,6 @@ import org.mockito.stubbing.Answer;
 @RunWith(JUnit4.class)
 public class RetryHttpRequestInitializerTest {
 
-  @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(RetryHttpRequestInitializer.class);
-
   @Mock private PrivateKey mockPrivateKey;
   @Mock private LowLevelHttpRequest mockLowLevelRequest;
   @Mock private LowLevelHttpResponse mockLowLevelResponse;
@@ -138,7 +135,6 @@ public class RetryHttpRequestInitializerTest {
     verify(mockLowLevelRequest).setTimeout(anyInt(), anyInt());
     verify(mockLowLevelRequest).execute();
     verify(mockLowLevelResponse).getStatusCode();
-    expectedLogs.verifyNotLogged("Request failed");
   }
 
   /**
@@ -157,7 +153,7 @@ public class RetryHttpRequestInitializerTest {
       HttpResponse response = result.executeUnparsed();
       assertNotNull(response);
     } catch (HttpResponseException e) {
-      assertThat(e.getMessage(), Matchers.containsString("403"));
+      Assert.assertThat(e.getMessage(), Matchers.containsString("403"));
     }
 
     verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
@@ -166,7 +162,6 @@ public class RetryHttpRequestInitializerTest {
     verify(mockLowLevelRequest).setTimeout(anyInt(), anyInt());
     verify(mockLowLevelRequest).execute();
     verify(mockLowLevelResponse).getStatusCode();
-    expectedLogs.verifyWarn("Request failed with code 403");
   }
 
   /**
@@ -193,7 +188,6 @@ public class RetryHttpRequestInitializerTest {
     verify(mockLowLevelRequest, times(3)).setTimeout(anyInt(), anyInt());
     verify(mockLowLevelRequest, times(3)).execute();
     verify(mockLowLevelResponse, times(3)).getStatusCode();
-    expectedLogs.verifyDebug("Request failed with code 503");
   }
 
   /**
@@ -217,7 +211,6 @@ public class RetryHttpRequestInitializerTest {
     verify(mockLowLevelRequest, times(2)).setTimeout(anyInt(), anyInt());
     verify(mockLowLevelRequest, times(2)).execute();
     verify(mockLowLevelResponse).getStatusCode();
-    expectedLogs.verifyDebug("Request failed with IOException");
   }
 
   /**
@@ -231,22 +224,19 @@ public class RetryHttpRequestInitializerTest {
       int n = 0;
       @Override
       public Integer answer(InvocationOnMock invocation) {
-        return n++ < retries ? 503 : 9999;
+        return (n++ < retries - 1) ? 503 : 200;
       }});
 
     Storage.Buckets.Get result = storage.buckets().get("test");
-    try {
-      result.executeUnparsed();
-      fail();
-    } catch (Throwable t) {
-    }
+    HttpResponse response = result.executeUnparsed();
+    assertNotNull(response);
 
     verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
-    verify(mockLowLevelRequest, atLeastOnce()).addHeader(anyString(), anyString());
-    verify(mockLowLevelRequest, times(retries + 1)).setTimeout(anyInt(), anyInt());
-    verify(mockLowLevelRequest, times(retries + 1)).execute();
-    verify(mockLowLevelResponse, times(retries + 1)).getStatusCode();
-    expectedLogs.verifyWarn("performed 10 retries due to unsuccessful status codes");
+    verify(mockLowLevelRequest, atLeastOnce()).addHeader(anyString(),
+        anyString());
+    verify(mockLowLevelRequest, times(retries)).setTimeout(anyInt(), anyInt());
+    verify(mockLowLevelRequest, times(retries)).execute();
+    verify(mockLowLevelResponse, times(retries)).getStatusCode();
   }
 
   /**
@@ -286,7 +276,6 @@ public class RetryHttpRequestInitializerTest {
     } catch (Throwable e) {
       assertThat(e, Matchers.<Throwable>instanceOf(SocketTimeoutException.class));
       assertEquals(1 + defaultNumberOfRetries, executeCount.get());
-      expectedLogs.verifyWarn("performed 10 retries due to IOExceptions");
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/jackson/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/jackson/pom.xml b/sdks/java/extensions/jackson/pom.xml
index 7fd38e0..4b09c11 100644
--- a/sdks/java/extensions/jackson/pom.xml
+++ b/sdks/java/extensions/jackson/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-extensions-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/join-library/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/pom.xml b/sdks/java/extensions/join-library/pom.xml
index ea24b75..556ec40 100644
--- a/sdks/java/extensions/join-library/pom.xml
+++ b/sdks/java/extensions/join-library/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-extensions-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml
index 1222476..3d63626 100644
--- a/sdks/java/extensions/pom.xml
+++ b/sdks/java/extensions/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/protobuf/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/pom.xml b/sdks/java/extensions/protobuf/pom.xml
index 63855f8..ae909ab 100644
--- a/sdks/java/extensions/protobuf/pom.xml
+++ b/sdks/java/extensions/protobuf/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-extensions-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/sorter/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/pom.xml b/sdks/java/extensions/sorter/pom.xml
index 395c73f..9d25f9d 100644
--- a/sdks/java/extensions/sorter/pom.xml
+++ b/sdks/java/extensions/sorter/pom.xml
@@ -22,13 +22,17 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-extensions-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
   <artifactId>beam-sdks-java-extensions-sorter</artifactId>
   <name>Apache Beam :: SDKs :: Java :: Extensions :: Sorter</name>
 
+  <properties>
+    <hadoop.version>2.7.1</hadoop.version>
+  </properties>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.beam</groupId>
@@ -38,12 +42,14 @@
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${hadoop.version}</version>
       <scope>provided</scope>
     </dependency>
     
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
       <scope>provided</scope>
     </dependency>
     

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/harness/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml
index fe5c2f1..61a170a 100644
--- a/sdks/java/harness/pom.xml
+++ b/sdks/java/harness/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
@@ -83,11 +83,6 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
-      <artifactId>beam-runners-core-construction-java</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
     </dependency>
 
@@ -155,21 +150,10 @@
     </dependency>
 
     <dependency>
-      <groupId>joda-time</groupId>
-      <artifactId>joda-time</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
 
-    <dependency>
-      <groupId>com.google.auto.service</groupId>
-      <artifactId>auto-service</artifactId>
-      <optional>true</optional>
-    </dependency>
-
     <!-- test dependencies -->
     <dependency>
       <groupId>org.hamcrest</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 2a9cef8..e33277a 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -18,111 +18,82 @@
 
 package org.apache.beam.fn.harness.control;
 
-import com.google.common.annotations.VisibleForTesting;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.Iterables.getOnlyElement;
+
+import com.google.common.collect.Collections2;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.Message;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.ServiceLoader;
-import java.util.Set;
+import java.util.Objects;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.fake.FakeStepContext;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.core.PTransformRunnerFactory;
-import org.apache.beam.runners.core.PTransformRunnerFactory.Registrar;
+import org.apache.beam.runners.core.BeamFnDataReadRunner;
+import org.apache.beam.runners.core.BeamFnDataWriteRunner;
+import org.apache.beam.runners.core.BoundedSourceRunner;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.dataflow.util.DoFnInfo;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.ReflectHelpers;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Processes {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s by materializing
- * the set of required runners for each {@link RunnerApi.FunctionSpec},
+ * the set of required runners for each {@link org.apache.beam.fn.v1.BeamFnApi.FunctionSpec},
  * wiring them together based upon the {@code input} and {@code output} map definitions.
  *
  * <p>Finally executes the DAG based graph by starting all runners in reverse topological order,
  * and finishing all runners in forward topological order.
  */
 public class ProcessBundleHandler {
-
   // TODO: What should the initial set of URNs be?
   private static final String DATA_INPUT_URN = "urn:org.apache.beam:source:runner:0.1";
-  public static final String JAVA_SOURCE_URN = "urn:org.apache.beam:source:java:0.1";
+  private static final String DATA_OUTPUT_URN = "urn:org.apache.beam:sink:runner:0.1";
+  private static final String JAVA_DO_FN_URN = "urn:org.apache.beam:dofn:java:0.1";
+  private static final String JAVA_SOURCE_URN = "urn:org.apache.beam:source:java:0.1";
 
   private static final Logger LOG = LoggerFactory.getLogger(ProcessBundleHandler.class);
-  private static final Map<String, PTransformRunnerFactory> REGISTERED_RUNNER_FACTORIES;
-
-  static {
-    Set<Registrar> pipelineRunnerRegistrars =
-        Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
-    pipelineRunnerRegistrars.addAll(
-        Lists.newArrayList(ServiceLoader.load(Registrar.class,
-            ReflectHelpers.findClassLoader())));
-
-    // Load all registered PTransform runner factories.
-    ImmutableMap.Builder<String, PTransformRunnerFactory> builder =
-        ImmutableMap.builder();
-    for (Registrar registrar : pipelineRunnerRegistrars) {
-      builder.putAll(registrar.getPTransformRunnerFactories());
-    }
-    REGISTERED_RUNNER_FACTORIES = builder.build();
-  }
 
   private final PipelineOptions options;
   private final Function<String, Message> fnApiRegistry;
   private final BeamFnDataClient beamFnDataClient;
-  private final Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap;
-  private final PTransformRunnerFactory defaultPTransformRunnerFactory;
-
 
   public ProcessBundleHandler(
       PipelineOptions options,
       Function<String, Message> fnApiRegistry,
       BeamFnDataClient beamFnDataClient) {
-    this(options, fnApiRegistry, beamFnDataClient, REGISTERED_RUNNER_FACTORIES);
-  }
-
-  @VisibleForTesting
-  ProcessBundleHandler(
-      PipelineOptions options,
-      Function<String, Message> fnApiRegistry,
-      BeamFnDataClient beamFnDataClient,
-      Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap) {
     this.options = options;
     this.fnApiRegistry = fnApiRegistry;
     this.beamFnDataClient = beamFnDataClient;
-    this.urnToPTransformRunnerFactoryMap = urnToPTransformRunnerFactoryMap;
-    this.defaultPTransformRunnerFactory = new PTransformRunnerFactory<Object>() {
-      @Override
-      public Object createRunnerForPTransform(
-          PipelineOptions pipelineOptions,
-          BeamFnDataClient beamFnDataClient,
-          String pTransformId,
-          RunnerApi.PTransform pTransform,
-          Supplier<String> processBundleInstructionId,
-          Map<String, RunnerApi.PCollection> pCollections,
-          Map<String, RunnerApi.Coder> coders,
-          Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
-          Consumer<ThrowingRunnable> addStartFunction,
-          Consumer<ThrowingRunnable> addFinishFunction) {
-        throw new IllegalStateException(String.format(
-            "No factory registered for %s, known factories %s",
-            pTransform.getSpec().getUrn(),
-            urnToPTransformRunnerFactoryMap.keySet()));
-      }
-    };
   }
 
   private void createRunnerAndConsumersForPTransformRecursively(
@@ -157,19 +128,115 @@ public class ProcessBundleHandler {
       }
     }
 
-    urnToPTransformRunnerFactoryMap.getOrDefault(
-        pTransform.getSpec().getUrn(), defaultPTransformRunnerFactory)
-        .createRunnerForPTransform(
-            options,
-            beamFnDataClient,
-            pTransformId,
-            pTransform,
-            processBundleInstructionId,
-            processBundleDescriptor.getPcollectionsMap(),
-            processBundleDescriptor.getCodersMap(),
-            pCollectionIdsToConsumers,
-            addStartFunction,
-            addFinishFunction);
+    createRunnerForPTransform(
+        pTransformId,
+        pTransform,
+        processBundleInstructionId,
+        processBundleDescriptor.getPcollectionsMap(),
+        pCollectionIdsToConsumers,
+        addStartFunction,
+        addFinishFunction);
+  }
+
+  protected void createRunnerForPTransform(
+      String pTransformId,
+      RunnerApi.PTransform pTransform,
+      Supplier<String> processBundleInstructionId,
+      Map<String, RunnerApi.PCollection> pCollections,
+      Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
+      Consumer<ThrowingRunnable> addStartFunction,
+      Consumer<ThrowingRunnable> addFinishFunction) throws IOException {
+
+
+    // For every output PCollection, create a map from output name to Consumer
+    ImmutableMap.Builder<String, Collection<ThrowingConsumer<WindowedValue<?>>>>
+        outputMapBuilder = ImmutableMap.builder();
+    for (Map.Entry<String, String> entry : pTransform.getOutputsMap().entrySet()) {
+      outputMapBuilder.put(
+          entry.getKey(),
+          pCollectionIdsToConsumers.get(entry.getValue()));
+    }
+    ImmutableMap<String, Collection<ThrowingConsumer<WindowedValue<?>>>> outputMap =
+        outputMapBuilder.build();
+
+
+    // Based upon the function spec, populate the start/finish/consumer information.
+    RunnerApi.FunctionSpec functionSpec = pTransform.getSpec();
+    ThrowingConsumer<WindowedValue<?>> consumer;
+    switch (functionSpec.getUrn()) {
+      default:
+        BeamFnApi.Target target;
+        RunnerApi.Coder coderSpec;
+        throw new IllegalArgumentException(
+            String.format("Unknown FunctionSpec %s", functionSpec));
+
+      case DATA_OUTPUT_URN:
+        target = BeamFnApi.Target.newBuilder()
+            .setPrimitiveTransformReference(pTransformId)
+            .setName(getOnlyElement(pTransform.getInputsMap().keySet()))
+            .build();
+        coderSpec = (RunnerApi.Coder) fnApiRegistry.apply(
+            pCollections.get(getOnlyElement(pTransform.getInputsMap().values())).getCoderId());
+        BeamFnDataWriteRunner<Object> remoteGrpcWriteRunner =
+            new BeamFnDataWriteRunner<Object>(
+                functionSpec,
+                processBundleInstructionId,
+                target,
+                coderSpec,
+                beamFnDataClient);
+        addStartFunction.accept(remoteGrpcWriteRunner::registerForOutput);
+        consumer = (ThrowingConsumer)
+            (ThrowingConsumer<WindowedValue<Object>>) remoteGrpcWriteRunner::consume;
+        addFinishFunction.accept(remoteGrpcWriteRunner::close);
+        break;
+
+      case DATA_INPUT_URN:
+        target = BeamFnApi.Target.newBuilder()
+            .setPrimitiveTransformReference(pTransformId)
+            .setName(getOnlyElement(pTransform.getOutputsMap().keySet()))
+            .build();
+        coderSpec = (RunnerApi.Coder) fnApiRegistry.apply(
+            pCollections.get(getOnlyElement(pTransform.getOutputsMap().values())).getCoderId());
+        BeamFnDataReadRunner<?> remoteGrpcReadRunner =
+            new BeamFnDataReadRunner<Object>(
+                functionSpec,
+                processBundleInstructionId,
+                target,
+                coderSpec,
+                beamFnDataClient,
+                (Map) outputMap);
+        addStartFunction.accept(remoteGrpcReadRunner::registerInputLocation);
+        consumer = null;
+        addFinishFunction.accept(remoteGrpcReadRunner::blockTillReadFinishes);
+        break;
+
+      case JAVA_DO_FN_URN:
+        DoFnRunner<Object, Object> doFnRunner = createDoFnRunner(functionSpec, (Map) outputMap);
+        addStartFunction.accept(doFnRunner::startBundle);
+        consumer = (ThrowingConsumer)
+            (ThrowingConsumer<WindowedValue<Object>>) doFnRunner::processElement;
+        addFinishFunction.accept(doFnRunner::finishBundle);
+        break;
+
+      case JAVA_SOURCE_URN:
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        BoundedSourceRunner<BoundedSource<Object>, Object> sourceRunner =
+            createBoundedSourceRunner(functionSpec, (Map) outputMap);
+        // TODO: Remove and replace with source being sent across gRPC port
+        addStartFunction.accept(sourceRunner::start);
+        consumer = (ThrowingConsumer)
+            (ThrowingConsumer<WindowedValue<BoundedSource<Object>>>)
+                sourceRunner::runReadLoop;
+        break;
+    }
+
+    // If we created a consumer, add it to the map containing PCollection ids to consumers
+    if (consumer != null) {
+      for (String inputPCollectionId :
+          pTransform.getInputsMap().values()) {
+        pCollectionIdsToConsumers.put(inputPCollectionId, consumer);
+      }
+    }
   }
 
   public BeamFnApi.InstructionResponse.Builder processBundle(BeamFnApi.InstructionRequest request)
@@ -232,4 +299,88 @@ public class ProcessBundleHandler {
 
     return response;
   }
+
+  /**
+   * Converts a {@link org.apache.beam.fn.v1.BeamFnApi.FunctionSpec} into a {@link DoFnRunner}.
+   */
+  private <InputT, OutputT> DoFnRunner<InputT, OutputT> createDoFnRunner(
+      RunnerApi.FunctionSpec functionSpec,
+      Map<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> outputMap) {
+    ByteString serializedFn;
+    try {
+      serializedFn = functionSpec.getParameter().unpack(BytesValue.class).getValue();
+    } catch (InvalidProtocolBufferException e) {
+      throw new IllegalArgumentException(
+          String.format("Unable to unwrap DoFn %s", functionSpec), e);
+    }
+    DoFnInfo<?, ?> doFnInfo =
+        (DoFnInfo<?, ?>)
+            SerializableUtils.deserializeFromByteArray(serializedFn.toByteArray(), "DoFnInfo");
+
+    checkArgument(
+        Objects.equals(
+            new HashSet<>(Collections2.transform(outputMap.keySet(), Long::parseLong)),
+            doFnInfo.getOutputMap().keySet()),
+        "Unexpected mismatch between transform output map %s and DoFnInfo output map %s.",
+        outputMap.keySet(),
+        doFnInfo.getOutputMap());
+
+    ImmutableMultimap.Builder<TupleTag<?>,
+                              ThrowingConsumer<WindowedValue<OutputT>>> tagToOutput =
+                              ImmutableMultimap.builder();
+    for (Map.Entry<Long, TupleTag<?>> entry : doFnInfo.getOutputMap().entrySet()) {
+      tagToOutput.putAll(entry.getValue(), outputMap.get(Long.toString(entry.getKey())));
+    }
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    final Map<TupleTag<?>, Collection<ThrowingConsumer<WindowedValue<?>>>> tagBasedOutputMap =
+        (Map) tagToOutput.build().asMap();
+
+    OutputManager outputManager =
+        new OutputManager() {
+          Map<TupleTag<?>, Collection<ThrowingConsumer<WindowedValue<?>>>> tupleTagToOutput =
+              tagBasedOutputMap;
+
+          @Override
+          public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+            try {
+              Collection<ThrowingConsumer<WindowedValue<?>>> consumers =
+                  tupleTagToOutput.get(tag);
+              if (consumers == null) {
+                /* This is a normal case, e.g., if a DoFn has output but that output is not
+                 * consumed. Drop the output. */
+                return;
+              }
+              for (ThrowingConsumer<WindowedValue<?>> consumer : consumers) {
+                consumer.accept(output);
+              }
+            } catch (Throwable t) {
+              throw new RuntimeException(t);
+            }
+          }
+        };
+
+    @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
+    DoFnRunner<InputT, OutputT> runner =
+        DoFnRunners.simpleRunner(
+            PipelineOptionsFactory.create(), /* TODO */
+            (DoFn) doFnInfo.getDoFn(),
+            NullSideInputReader.empty(), /* TODO */
+            outputManager,
+            (TupleTag) doFnInfo.getOutputMap().get(doFnInfo.getMainOutput()),
+            new ArrayList<>(doFnInfo.getOutputMap().values()),
+            new FakeStepContext(),
+            (WindowingStrategy) doFnInfo.getWindowingStrategy());
+    return runner;
+  }
+
+  private <InputT extends BoundedSource<OutputT>, OutputT>
+      BoundedSourceRunner<InputT, OutputT> createBoundedSourceRunner(
+          RunnerApi.FunctionSpec functionSpec,
+          Map<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> outputMap) {
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    BoundedSourceRunner<InputT, OutputT> runner =
+        new BoundedSourceRunner(options, functionSpec, outputMap);
+    return runner;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
index 0e738ac..276a120 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
@@ -79,7 +79,7 @@ public class RegisterHandler {
           processBundleDescriptor.getClass());
       computeIfAbsent(processBundleDescriptor.getId()).complete(processBundleDescriptor);
       for (Map.Entry<String, RunnerApi.Coder> entry
-          : processBundleDescriptor.getCodersMap().entrySet()) {
+          : processBundleDescriptor.getCodersyyyMap().entrySet()) {
         LOG.debug("Registering {} with type {}",
             entry.getKey(),
             entry.getValue().getClass());