You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/04 01:26:14 UTC

[06/19] incubator-beam git commit: Rename DoFn to OldDoFn

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java
index 3b314b2..8b00c03 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java
@@ -142,9 +142,9 @@ public class DoFnWithContextTest implements Serializable {
   @Test
   public void testDoFnWithContextUsingAggregators() {
     NoOpDoFn<Object, Object> noOpFn = new NoOpDoFn<>();
-    DoFn<Object, Object>.Context context = noOpFn.context();
+    OldDoFn<Object, Object>.Context context = noOpFn.context();
 
-    DoFn<Object, Object> fn = spy(noOpFn);
+    OldDoFn<Object, Object> fn = spy(noOpFn);
     context = spy(context);
 
     @SuppressWarnings("unchecked")
@@ -225,7 +225,7 @@ public class DoFnWithContextTest implements Serializable {
   }
 
   /**
-   * Initialize a test pipeline with the specified {@link DoFn}.
+   * Initialize a test pipeline with the specified {@link OldDoFn}.
    */
   private <InputT, OutputT> TestPipeline createTestPipeline(DoFnWithContext<InputT, OutputT> fn) {
     TestPipeline pipeline = TestPipeline.create();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index 80825cb..b81eedb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -130,7 +130,7 @@ public class FlattenTest implements Serializable {
 
     PCollection<String> output = p
         .apply(Create.of((Void) null).withCoder(VoidCoder.of()))
-        .apply(ParDo.withSideInputs(view).of(new DoFn<Void, String>() {
+        .apply(ParDo.withSideInputs(view).of(new OldDoFn<Void, String>() {
                   @Override
                   public void processElement(ProcessContext c) {
                     for (String side : c.sideInput(view)) {
@@ -339,7 +339,7 @@ public class FlattenTest implements Serializable {
 
   /////////////////////////////////////////////////////////////////////////////
 
-  private static class IdentityFn<T> extends DoFn<T, T> {
+  private static class IdentityFn<T> extends OldDoFn<T, T> {
     @Override
     public void processElement(ProcessContext c) {
       c.output(c.element());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index d6e4589..15c3ba8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms;
 
 import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.Matchers.empty;
@@ -55,7 +56,6 @@ import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
-
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Assert;
@@ -371,7 +371,7 @@ public class GroupByKeyTest {
     pipeline.run();
   }
 
-  private static class AssertTimestamp<K, V> extends DoFn<KV<K, V>, Void> {
+  private static class AssertTimestamp<K, V> extends OldDoFn<KV<K, V>, Void> {
     private final Instant timestamp;
 
     public AssertTimestamp(Instant timestamp) {
@@ -506,7 +506,7 @@ public class GroupByKeyTest {
    * Creates a KV that wraps the original KV together with a random key.
    */
   static class AssignRandomKey
-      extends DoFn<KV<BadEqualityKey, Long>, KV<Long, KV<BadEqualityKey, Long>>> {
+      extends OldDoFn<KV<BadEqualityKey, Long>, KV<Long, KV<BadEqualityKey, Long>>> {
 
     @Override
     public void processElement(ProcessContext c) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java
index 3355aeb..fa2fae9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms;
 import static org.apache.beam.sdk.testing.SystemNanoTimeSleeper.sleepMillis;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
+
 import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -74,7 +75,7 @@ public class IntraBundleParallelizationTest {
   /**
    * Introduces a delay in processing, then passes thru elements.
    */
-  private static class DelayFn<T> extends DoFn<T, T> {
+  private static class DelayFn<T> extends OldDoFn<T, T> {
     public static final long DELAY_MS = 25;
 
     @Override
@@ -94,7 +95,7 @@ public class IntraBundleParallelizationTest {
   /**
    * Throws an exception after some number of calls.
    */
-  private static class ExceptionThrowingFn<T> extends DoFn<T, T> {
+  private static class ExceptionThrowingFn<T> extends OldDoFn<T, T> {
     private ExceptionThrowingFn(int numSuccesses) {
       IntraBundleParallelizationTest.numSuccesses.set(numSuccesses);
     }
@@ -120,11 +121,11 @@ public class IntraBundleParallelizationTest {
   /**
    * Measures concurrency of the processElement method.
    */
-  private static class ConcurrencyMeasuringFn<T> extends DoFn<T, T> {
+  private static class ConcurrencyMeasuringFn<T> extends OldDoFn<T, T> {
     @Override
     public void processElement(ProcessContext c) {
       // Synchronize on the class to provide synchronous access irrespective of
-      // how this DoFn is called.
+      // how this OldDoFn is called.
       synchronized (ConcurrencyMeasuringFn.class) {
         concurrentElements++;
         if (concurrentElements > maxDownstreamConcurrency) {
@@ -154,8 +155,8 @@ public class IntraBundleParallelizationTest {
   }
 
   /**
-   * Test that the DoFn is parallelized up the the Max Parallelism factor within a bundle, but not
-   * greater than that amount.
+   * Test that the OldDoFn is parallelized up the the Max Parallelism factor within a bundle, but
+   * not greater than that amount.
    */
   @Test
   @Category(NeedsRunner.class)
@@ -224,7 +225,7 @@ public class IntraBundleParallelizationTest {
 
   @Test
   public void testDisplayData() {
-    DoFn<String, String> fn = new DoFn<String, String>() {
+    OldDoFn<String, String> fn = new OldDoFn<String, String>() {
       @Override
       public void processElement(ProcessContext c) throws Exception {
       }
@@ -248,15 +249,15 @@ public class IntraBundleParallelizationTest {
   /**
    * Runs the provided doFn inside of an {@link IntraBundleParallelization} transform.
    *
-   * <p>This method assumes that the DoFn passed to it will call {@link #startConcurrentCall()}
+   * <p>This method assumes that the OldDoFn passed to it will call {@link #startConcurrentCall()}
    * before processing each elements and {@link #finishConcurrentCall()} after each element.
    *
    * @param numElements the size of the input
    * @param maxParallelism how many threads to execute in parallel
-   * @param doFn the DoFn to execute
-   * @return the maximum observed parallelism of the DoFn
+   * @param doFn the OldDoFn to execute
+   * @return the maximum observed parallelism of the OldDoFn
    */
-  private int run(int numElements, int maxParallelism, DoFn<Integer, Integer> doFn) {
+  private int run(int numElements, int maxParallelism, OldDoFn<Integer, Integer> doFn) {
     Pipeline pipeline = TestPipeline.create();
 
     ArrayList<Integer> data = new ArrayList<>(numElements);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index f18504c..b4751d2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.transforms;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertThat;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
index 226255a..87fa554 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms;
 
 import static org.apache.beam.sdk.TestUtils.checkCombineFn;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
index d7ec322..cd03a74 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
@@ -20,10 +20,12 @@ package org.apache.beam.sdk.transforms;
 
 import static org.apache.beam.sdk.TestUtils.checkCombineFn;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
 import org.apache.beam.sdk.transforms.display.DisplayData;
+
 import com.google.common.collect.Lists;
 
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpDoFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpDoFn.java
index a389fac..5c43755 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpDoFn.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpDoFn.java
@@ -28,35 +28,35 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;
 
 /**
- * A {@link DoFn} that does nothing with provided elements. Used for testing
- * methods provided by the DoFn abstract class.
+ * A {@link OldDoFn} that does nothing with provided elements. Used for testing
+ * methods provided by the OldDoFn abstract class.
  *
  * @param <InputT> unused.
  * @param <OutputT> unused.
  */
-class NoOpDoFn<InputT, OutputT> extends DoFn<InputT, OutputT> {
+class NoOpDoFn<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
   @Override
-  public void processElement(DoFn<InputT, OutputT>.ProcessContext c) throws Exception {
+  public void processElement(OldDoFn<InputT, OutputT>.ProcessContext c) throws Exception {
   }
 
   /**
    * Returns a new NoOp Context.
    */
-  public DoFn<InputT, OutputT>.Context context() {
+  public OldDoFn<InputT, OutputT>.Context context() {
     return new NoOpDoFnContext();
   }
 
   /**
    * Returns a new NoOp Process Context.
    */
-  public DoFn<InputT, OutputT>.ProcessContext processContext() {
+  public OldDoFn<InputT, OutputT>.ProcessContext processContext() {
     return new NoOpDoFnProcessContext();
   }
 
   /**
-   * A {@link DoFn.Context} that does nothing and returns exclusively null.
+   * A {@link OldDoFn.Context} that does nothing and returns exclusively null.
    */
-  private class NoOpDoFnContext extends DoFn<InputT, OutputT>.Context {
+  private class NoOpDoFnContext extends OldDoFn<InputT, OutputT>.Context {
     @Override
     public PipelineOptions getPipelineOptions() {
       return null;
@@ -82,10 +82,10 @@ class NoOpDoFn<InputT, OutputT> extends DoFn<InputT, OutputT> {
   }
 
   /**
-   * A {@link DoFn.ProcessContext} that does nothing and returns exclusively
+   * A {@link OldDoFn.ProcessContext} that does nothing and returns exclusively
    * null.
    */
-  private class NoOpDoFnProcessContext extends DoFn<InputT, OutputT>.ProcessContext {
+  private class NoOpDoFnProcessContext extends OldDoFn<InputT, OutputT>.ProcessContext {
     @Override
     public InputT element() {
       return null;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java
new file mode 100644
index 0000000..9234ccb
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link OldDoFn.Context}.
+ */
+@RunWith(JUnit4.class)
+public class OldDoFnContextTest {
+
+  @Mock
+  private Aggregator<Long, Long> agg;
+
+  private OldDoFn<Object, Object> fn;
+  private OldDoFn<Object, Object>.Context context;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+
+    // Need to be real objects to call the constructor, and to reference the
+    // outer instance of OldDoFn
+    NoOpDoFn<Object, Object> noOpFn = new NoOpDoFn<>();
+    OldDoFn<Object, Object>.Context noOpContext = noOpFn.context();
+
+    fn = spy(noOpFn);
+    context = spy(noOpContext);
+  }
+
+  @Test
+  public void testSetupDelegateAggregatorsCreatesAndLinksDelegateAggregators() {
+    Sum.SumLongFn combiner = new Sum.SumLongFn();
+    Aggregator<Long, Long> delegateAggregator =
+        fn.createAggregator("test", combiner);
+
+    when(context.createAggregatorInternal("test", combiner)).thenReturn(agg);
+
+    context.setupDelegateAggregators();
+    delegateAggregator.addValue(1L);
+
+    verify(agg).addValue(1L);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
new file mode 100644
index 0000000..49f4366
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.runners.AggregatorValues;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
+import org.apache.beam.sdk.transforms.Sum.SumIntegerFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Tests for OldDoFn.
+ */
+@RunWith(JUnit4.class)
+public class OldDoFnTest implements Serializable {
+
+  @Rule
+  public transient ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testCreateAggregatorWithCombinerSucceeds() {
+    String name = "testAggregator";
+    Sum.SumLongFn combiner = new Sum.SumLongFn();
+
+    OldDoFn<Void, Void> doFn = new NoOpDoFn<>();
+
+    Aggregator<Long, Long> aggregator = doFn.createAggregator(name, combiner);
+
+    assertEquals(name, aggregator.getName());
+    assertEquals(combiner, aggregator.getCombineFn());
+  }
+
+  @Test
+  public void testCreateAggregatorWithNullNameThrowsException() {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("name cannot be null");
+
+    OldDoFn<Void, Void> doFn = new NoOpDoFn<>();
+
+    doFn.createAggregator(null, new Sum.SumLongFn());
+  }
+
+  @Test
+  public void testCreateAggregatorWithNullCombineFnThrowsException() {
+    CombineFn<Object, Object, Object> combiner = null;
+
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("combiner cannot be null");
+
+    OldDoFn<Void, Void> doFn = new NoOpDoFn<>();
+
+    doFn.createAggregator("testAggregator", combiner);
+  }
+
+  @Test
+  public void testCreateAggregatorWithNullSerializableFnThrowsException() {
+    SerializableFunction<Iterable<Object>, Object> combiner = null;
+
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("combiner cannot be null");
+
+    OldDoFn<Void, Void> doFn = new NoOpDoFn<>();
+
+    doFn.createAggregator("testAggregator", combiner);
+  }
+
+  @Test
+  public void testCreateAggregatorWithSameNameThrowsException() {
+    String name = "testAggregator";
+    CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn();
+
+    OldDoFn<Void, Void> doFn = new NoOpDoFn<>();
+
+    doFn.createAggregator(name, combiner);
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot create");
+    thrown.expectMessage(name);
+    thrown.expectMessage("already exists");
+
+    doFn.createAggregator(name, combiner);
+  }
+
+  @Test
+  public void testCreateAggregatorsWithDifferentNamesSucceeds() {
+    String nameOne = "testAggregator";
+    String nameTwo = "aggregatorPrime";
+    CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn();
+
+    OldDoFn<Void, Void> doFn = new NoOpDoFn<>();
+
+    Aggregator<Double, Double> aggregatorOne =
+        doFn.createAggregator(nameOne, combiner);
+    Aggregator<Double, Double> aggregatorTwo =
+        doFn.createAggregator(nameTwo, combiner);
+
+    assertNotEquals(aggregatorOne, aggregatorTwo);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testCreateAggregatorInStartBundleThrows() {
+    TestPipeline p = createTestPipeline(new OldDoFn<String, String>() {
+      @Override
+      public void startBundle(OldDoFn<String, String>.Context c) throws Exception {
+        createAggregator("anyAggregate", new MaxIntegerFn());
+      }
+
+      @Override
+      public void processElement(OldDoFn<String, String>.ProcessContext c) throws Exception {}
+    });
+
+    thrown.expect(PipelineExecutionException.class);
+    thrown.expectCause(isA(IllegalStateException.class));
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testCreateAggregatorInProcessElementThrows() {
+    TestPipeline p = createTestPipeline(new OldDoFn<String, String>() {
+      @Override
+      public void processElement(ProcessContext c) throws Exception {
+        createAggregator("anyAggregate", new MaxIntegerFn());
+      }
+    });
+
+    thrown.expect(PipelineExecutionException.class);
+    thrown.expectCause(isA(IllegalStateException.class));
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testCreateAggregatorInFinishBundleThrows() {
+    TestPipeline p = createTestPipeline(new OldDoFn<String, String>() {
+      @Override
+      public void finishBundle(OldDoFn<String, String>.Context c) throws Exception {
+        createAggregator("anyAggregate", new MaxIntegerFn());
+      }
+
+      @Override
+      public void processElement(OldDoFn<String, String>.ProcessContext c) throws Exception {}
+    });
+
+    thrown.expect(PipelineExecutionException.class);
+    thrown.expectCause(isA(IllegalStateException.class));
+
+    p.run();
+  }
+
+  /**
+   * Initialize a test pipeline with the specified {@link OldDoFn}.
+   */
+  private <InputT, OutputT> TestPipeline createTestPipeline(OldDoFn<InputT, OutputT> fn) {
+    TestPipeline pipeline = TestPipeline.create();
+    pipeline.apply(Create.of((InputT) null))
+     .apply(ParDo.of(fn));
+
+    return pipeline;
+  }
+
+  @Test
+  public void testPopulateDisplayDataDefaultBehavior() {
+    OldDoFn<String, String> usesDefault =
+        new OldDoFn<String, String>() {
+          @Override
+          public void processElement(ProcessContext c) throws Exception {}
+        };
+
+    DisplayData data = DisplayData.from(usesDefault);
+    assertThat(data.items(), empty());
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testAggregators() throws Exception {
+    Pipeline pipeline = TestPipeline.create();
+
+    CountOddsFn countOdds = new CountOddsFn();
+    pipeline
+        .apply(Create.of(1, 3, 5, 7, 2, 4, 6, 8, 10, 12, 14, 20, 42, 68, 100))
+        .apply(ParDo.of(countOdds));
+    PipelineResult result = pipeline.run();
+
+    AggregatorValues<Integer> values = result.getAggregatorValues(countOdds.aggregator);
+    assertThat(values.getValuesAtSteps(),
+        equalTo((Map<String, Integer>) ImmutableMap.<String, Integer>of("ParDo(CountOdds)", 4)));
+  }
+
+  private static class CountOddsFn extends OldDoFn<Integer, Void> {
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      if (c.element() % 2 == 1) {
+        aggregator.addValue(1);
+      }
+    }
+
+    Aggregator<Integer, Integer> aggregator =
+        createAggregator("odds", new SumIntegerFn());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 868270c..0a6eab0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -26,6 +26,7 @@ import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
 import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.equalTo;
@@ -43,7 +44,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.ParDo.Bound;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
@@ -59,7 +60,6 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
-
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Rule;
@@ -89,7 +89,9 @@ public class ParDoTest implements Serializable {
   @Rule
   public transient ExpectedException thrown = ExpectedException.none();
 
-  private static class PrintingDoFn extends DoFn<String, String> implements RequiresWindowAccess {
+  private static class PrintingOldDoFn extends OldDoFn<String, String> implements
+      RequiresWindowAccess {
+
     @Override
     public void processElement(ProcessContext c) {
       c.output(c.element() + ":" + c.timestamp().getMillis()
@@ -97,17 +99,17 @@ public class ParDoTest implements Serializable {
     }
   }
 
-  static class TestDoFn extends DoFn<Integer, String> {
+  static class TestOldDoFn extends OldDoFn<Integer, String> {
     enum State { UNSTARTED, STARTED, PROCESSING, FINISHED }
     State state = State.UNSTARTED;
 
     final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>();
     final List<TupleTag<String>> sideOutputTupleTags = new ArrayList<>();
 
-    public TestDoFn() {
+    public TestOldDoFn() {
     }
 
-    public TestDoFn(List<PCollectionView<Integer>> sideInputViews,
+    public TestOldDoFn(List<PCollectionView<Integer>> sideInputViews,
                     List<TupleTag<String>> sideOutputTupleTags) {
       this.sideInputViews.addAll(sideInputViews);
       this.sideOutputTupleTags.addAll(sideOutputTupleTags);
@@ -161,9 +163,9 @@ public class ParDoTest implements Serializable {
     }
   }
 
-  static class TestNoOutputDoFn extends DoFn<Integer, String> {
+  static class TestNoOutputDoFn extends OldDoFn<Integer, String> {
     @Override
-    public void processElement(DoFn<Integer, String>.ProcessContext c) throws Exception {}
+    public void processElement(OldDoFn<Integer, String>.ProcessContext c) throws Exception {}
   }
 
   static class TestDoFnWithContext extends DoFnWithContext<Integer, String> {
@@ -229,7 +231,7 @@ public class ParDoTest implements Serializable {
     }
   }
 
-  static class TestStartBatchErrorDoFn extends DoFn<Integer, String> {
+  static class TestStartBatchErrorDoFn extends OldDoFn<Integer, String> {
     @Override
     public void startBundle(Context c) {
       throw new RuntimeException("test error in initialize");
@@ -241,14 +243,14 @@ public class ParDoTest implements Serializable {
     }
   }
 
-  static class TestProcessElementErrorDoFn extends DoFn<Integer, String> {
+  static class TestProcessElementErrorDoFn extends OldDoFn<Integer, String> {
     @Override
     public void processElement(ProcessContext c) {
       throw new RuntimeException("test error in process");
     }
   }
 
-  static class TestFinishBatchErrorDoFn extends DoFn<Integer, String> {
+  static class TestFinishBatchErrorDoFn extends OldDoFn<Integer, String> {
     @Override
     public void processElement(ProcessContext c) {
       // This has to be here.
@@ -260,13 +262,13 @@ public class ParDoTest implements Serializable {
     }
   }
 
-  private static class StrangelyNamedDoer extends DoFn<Integer, String> {
+  private static class StrangelyNamedDoer extends OldDoFn<Integer, String> {
     @Override
     public void processElement(ProcessContext c) {
     }
   }
 
-  static class TestOutputTimestampDoFn extends DoFn<Integer, Integer> {
+  static class TestOutputTimestampDoFn extends OldDoFn<Integer, Integer> {
     @Override
     public void processElement(ProcessContext c) {
       Integer value = c.element();
@@ -274,7 +276,7 @@ public class ParDoTest implements Serializable {
     }
   }
 
-  static class TestShiftTimestampDoFn extends DoFn<Integer, Integer> {
+  static class TestShiftTimestampDoFn extends OldDoFn<Integer, Integer> {
     private Duration allowedTimestampSkew;
     private Duration durationToShift;
 
@@ -297,7 +299,7 @@ public class ParDoTest implements Serializable {
     }
   }
 
-  static class TestFormatTimestampDoFn extends DoFn<Integer, String> {
+  static class TestFormatTimestampDoFn extends OldDoFn<Integer, String> {
     @Override
     public void processElement(ProcessContext c) {
       checkNotNull(c.timestamp());
@@ -318,7 +320,7 @@ public class ParDoTest implements Serializable {
       return PCollectionTuple.of(BY2, by2).and(BY3, by3);
     }
 
-    static class FilterFn extends DoFn<Integer, Integer> {
+    static class FilterFn extends OldDoFn<Integer, Integer> {
       private final int divisor;
 
       FilterFn(int divisor) {
@@ -343,7 +345,7 @@ public class ParDoTest implements Serializable {
 
     PCollection<String> output = pipeline
         .apply(Create.of(inputs))
-        .apply(ParDo.of(new TestDoFn()));
+        .apply(ParDo.of(new TestOldDoFn()));
 
     PAssert.that(output)
         .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs));
@@ -377,7 +379,7 @@ public class ParDoTest implements Serializable {
 
     PCollection<String> output = pipeline
         .apply(Create.of(inputs).withCoder(VarIntCoder.of()))
-        .apply("TestDoFn", ParDo.of(new TestDoFn()));
+        .apply("TestOldDoFn", ParDo.of(new TestOldDoFn()));
 
     PAssert.that(output)
         .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs));
@@ -395,7 +397,7 @@ public class ParDoTest implements Serializable {
 
     PCollection<String> output = pipeline
         .apply(Create.of(inputs).withCoder(VarIntCoder.of()))
-        .apply("TestDoFn", ParDo.of(new TestNoOutputDoFn()));
+        .apply("TestOldDoFn", ParDo.of(new TestNoOutputDoFn()));
 
     PAssert.that(output).empty();
 
@@ -418,7 +420,7 @@ public class ParDoTest implements Serializable {
     PCollectionTuple outputs = pipeline
         .apply(Create.of(inputs))
         .apply(ParDo
-               .of(new TestDoFn(
+               .of(new TestOldDoFn(
                    Arrays.<PCollectionView<Integer>>asList(),
                    Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3)))
                .withOutputTags(
@@ -461,7 +463,7 @@ public class ParDoTest implements Serializable {
     PCollectionTuple outputs = pipeline
         .apply(Create.of(inputs))
         .apply(ParDo
-               .of(new TestDoFn(
+               .of(new TestOldDoFn(
                    Arrays.<PCollectionView<Integer>>asList(),
                    Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3)))
                .withOutputTags(
@@ -527,7 +529,7 @@ public class ParDoTest implements Serializable {
     PCollectionTuple outputs = pipeline
         .apply(Create.of(inputs))
         .apply(ParDo.withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
-            .of(new DoFn<Integer, Void>(){
+            .of(new OldDoFn<Integer, Void>(){
                 @Override
                 public void processElement(ProcessContext c) {
                   c.sideOutput(sideOutputTag, c.element());
@@ -550,7 +552,7 @@ public class ParDoTest implements Serializable {
 
     PCollection<String> output = pipeline
         .apply(Create.of(inputs))
-        .apply(ParDo.of(new TestDoFn(
+        .apply(ParDo.of(new TestOldDoFn(
             Arrays.<PCollectionView<Integer>>asList(),
             Arrays.asList(sideTag))));
 
@@ -569,7 +571,7 @@ public class ParDoTest implements Serializable {
 
     // Success for a total of 1000 outputs.
     input
-        .apply("Success1000", ParDo.of(new DoFn<Integer, String>() {
+        .apply("Success1000", ParDo.of(new OldDoFn<Integer, String>() {
             @Override
             public void processElement(ProcessContext c) {
               TupleTag<String> specialSideTag = new TupleTag<String>(){};
@@ -585,7 +587,7 @@ public class ParDoTest implements Serializable {
 
     // Failure for a total of 1001 outputs.
     input
-        .apply("Failure1001", ParDo.of(new DoFn<Integer, String>() {
+        .apply("Failure1001", ParDo.of(new OldDoFn<Integer, String>() {
             @Override
             public void processElement(ProcessContext c) {
               for (int i = 0; i < 1000; i++) {
@@ -618,7 +620,7 @@ public class ParDoTest implements Serializable {
     PCollection<String> output = pipeline
         .apply(Create.of(inputs))
         .apply(ParDo.withSideInputs(sideInput1, sideInputUnread, sideInput2)
-            .of(new TestDoFn(
+            .of(new TestOldDoFn(
                 Arrays.asList(sideInput1, sideInput2),
                 Arrays.<TupleTag<String>>asList())));
 
@@ -652,7 +654,7 @@ public class ParDoTest implements Serializable {
         .apply(ParDo.withSideInputs(sideInput1)
             .withSideInputs(sideInputUnread)
             .withSideInputs(sideInput2)
-            .of(new TestDoFn(
+            .of(new TestOldDoFn(
                 Arrays.asList(sideInput1, sideInput2),
                 Arrays.<TupleTag<String>>asList())));
 
@@ -690,7 +692,7 @@ public class ParDoTest implements Serializable {
             .withSideInputs(sideInputUnread)
             .withSideInputs(sideInput2)
             .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
-            .of(new TestDoFn(
+            .of(new TestOldDoFn(
                 Arrays.asList(sideInput1, sideInput2),
                 Arrays.<TupleTag<String>>asList())));
 
@@ -728,7 +730,7 @@ public class ParDoTest implements Serializable {
             .withSideInputs(sideInputUnread)
             .withSideInputs(sideInput2)
             .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
-            .of(new TestDoFn(
+            .of(new TestOldDoFn(
                 Arrays.asList(sideInput1, sideInput2),
                 Arrays.<TupleTag<String>>asList())));
 
@@ -752,7 +754,7 @@ public class ParDoTest implements Serializable {
         .apply(View.<Integer>asSingleton());
 
     pipeline.apply("CreateMain", Create.of(inputs))
-        .apply(ParDo.of(new TestDoFn(
+        .apply(ParDo.of(new TestOldDoFn(
             Arrays.<PCollectionView<Integer>>asList(sideView),
             Arrays.<TupleTag<String>>asList())));
 
@@ -815,18 +817,18 @@ public class ParDoTest implements Serializable {
         .setName("MyInput");
 
     {
-      PCollection<String> output1 = input.apply(ParDo.of(new TestDoFn()));
+      PCollection<String> output1 = input.apply(ParDo.of(new TestOldDoFn()));
       assertEquals("ParDo(Test).out", output1.getName());
     }
 
     {
-      PCollection<String> output2 = input.apply("MyParDo", ParDo.of(new TestDoFn()));
+      PCollection<String> output2 = input.apply("MyParDo", ParDo.of(new TestOldDoFn()));
       assertEquals("MyParDo.out", output2.getName());
     }
 
     {
-      PCollection<String> output4 = input.apply("TestDoFn", ParDo.of(new TestDoFn()));
-      assertEquals("TestDoFn.out", output4.getName());
+      PCollection<String> output4 = input.apply("TestOldDoFn", ParDo.of(new TestOldDoFn()));
+      assertEquals("TestOldDoFn.out", output4.getName());
     }
 
     {
@@ -835,7 +837,7 @@ public class ParDoTest implements Serializable {
           output5.getName());
     }
 
-    assertEquals("ParDo(Printing)", ParDo.of(new PrintingDoFn()).getName());
+    assertEquals("ParDo(Printing)", ParDo.of(new PrintingOldDoFn()).getName());
 
     assertEquals(
         "ParMultiDo(SideOutputDummy)",
@@ -855,7 +857,7 @@ public class ParDoTest implements Serializable {
     PCollectionTuple outputs = p
         .apply(Create.of(Arrays.asList(3, -42, 666))).setName("MyInput")
         .apply("MyParDo", ParDo
-               .of(new TestDoFn(
+               .of(new TestOldDoFn(
                    Arrays.<PCollectionView<Integer>>asList(),
                    Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3)))
                .withOutputTags(
@@ -883,7 +885,7 @@ public class ParDoTest implements Serializable {
         .apply("CustomTransform", new PTransform<PCollection<Integer>, PCollection<String>>() {
             @Override
             public PCollection<String> apply(PCollection<Integer> input) {
-              return input.apply(ParDo.of(new TestDoFn()));
+              return input.apply(ParDo.of(new TestOldDoFn()));
             }
           });
 
@@ -920,7 +922,7 @@ public class ParDoTest implements Serializable {
   @Test
   public void testJsonEscaping() {
     // Declare an arbitrary function and make sure we can serialize it
-    DoFn<Integer, Integer> doFn = new DoFn<Integer, Integer>() {
+    OldDoFn<Integer, Integer> doFn = new OldDoFn<Integer, Integer>() {
       @Override
       public void processElement(ProcessContext c) {
         c.output(c.element() + 1);
@@ -973,7 +975,7 @@ public class ParDoTest implements Serializable {
     }
   }
 
-  private static class SideOutputDummyFn extends DoFn<Integer, Integer> {
+  private static class SideOutputDummyFn extends OldDoFn<Integer, Integer> {
     private TupleTag<TestDummy> sideTag;
     public SideOutputDummyFn(TupleTag<TestDummy> sideTag) {
       this.sideTag = sideTag;
@@ -985,7 +987,7 @@ public class ParDoTest implements Serializable {
      }
   }
 
-  private static class MainOutputDummyFn extends DoFn<Integer, TestDummy> {
+  private static class MainOutputDummyFn extends OldDoFn<Integer, TestDummy> {
     private TupleTag<Integer> sideTag;
     public MainOutputDummyFn(TupleTag<Integer> sideTag) {
       this.sideTag = sideTag;
@@ -1167,7 +1169,7 @@ public class ParDoTest implements Serializable {
         .apply(ParDo
             .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
             .of(
-                new DoFn<TestDummy, TestDummy>() {
+                new OldDoFn<TestDummy, TestDummy>() {
                   @Override public void processElement(ProcessContext context) {
                     TestDummy element = context.element();
                     context.output(element);
@@ -1181,7 +1183,7 @@ public class ParDoTest implements Serializable {
     // on a missing coder.
     tuple.get(mainOutputTag)
         .setCoder(TestDummyCoder.of())
-        .apply("Output1", ParDo.of(new DoFn<TestDummy, Integer>() {
+        .apply("Output1", ParDo.of(new OldDoFn<TestDummy, Integer>() {
           @Override public void processElement(ProcessContext context) {
             context.output(1);
           }
@@ -1228,7 +1230,7 @@ public class ParDoTest implements Serializable {
     PCollection<String> output =
         input
         .apply(ParDo.withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)).of(
-            new DoFn<Integer, Integer>() {
+            new OldDoFn<Integer, Integer>() {
               @Override
               public void processElement(ProcessContext c) {
                 c.sideOutputWithTimestamp(
@@ -1349,7 +1351,7 @@ public class ParDoTest implements Serializable {
     PCollection<String> output = pipeline
         .apply(Create.timestamped(TimestampedValue.of("elem", new Instant(1))))
         .apply(Window.<String>into(FixedWindows.of(Duration.millis(1))))
-        .apply(ParDo.of(new DoFn<String, String>() {
+        .apply(ParDo.of(new OldDoFn<String, String>() {
                   @Override
                   public void startBundle(Context c) {
                     c.outputWithTimestamp("start", new Instant(2));
@@ -1368,7 +1370,7 @@ public class ParDoTest implements Serializable {
                     System.out.println("Finish: 3");
                   }
                 }))
-        .apply(ParDo.of(new PrintingDoFn()));
+        .apply(ParDo.of(new PrintingOldDoFn()));
 
     PAssert.that(output).satisfies(new Checker());
 
@@ -1383,7 +1385,7 @@ public class ParDoTest implements Serializable {
     pipeline
         .apply(Create.timestamped(TimestampedValue.of("elem", new Instant(1))))
         .apply(Window.<String>into(FixedWindows.of(Duration.millis(1))))
-        .apply(ParDo.of(new DoFn<String, String>() {
+        .apply(ParDo.of(new OldDoFn<String, String>() {
                   @Override
                   public void startBundle(Context c) {
                     c.output("start");
@@ -1400,7 +1402,7 @@ public class ParDoTest implements Serializable {
   }
   @Test
   public void testDoFnDisplayData() {
-    DoFn<String, String> fn = new DoFn<String, String>() {
+    OldDoFn<String, String> fn = new OldDoFn<String, String>() {
       @Override
       public void processElement(ProcessContext c) {
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
index 243b52b..0cc804e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.transforms;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
index fe02573..e7f8cd0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
@@ -22,6 +22,7 @@ import static org.apache.beam.sdk.TestUtils.NO_LINES;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 
 import static com.google.common.base.Preconditions.checkArgument;
+
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
index a96d19b..fc0e659 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.transforms;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index 738b492..ee240bf 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.transforms;
 
 import static com.google.common.base.Preconditions.checkArgument;
+
 import static org.hamcrest.Matchers.isA;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -98,12 +99,13 @@ public class ViewTest implements Serializable {
 
     PCollection<Integer> output =
         pipeline.apply("Create123", Create.of(1, 2, 3))
-            .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
-              @Override
-              public void processElement(ProcessContext c) {
-                c.output(c.sideInput(view));
-              }
-            }));
+            .apply("OutputSideInputs",
+                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    c.output(c.sideInput(view));
+                  }
+                }));
 
     PAssert.that(output).containsInAnyOrder(47, 47, 47);
 
@@ -124,16 +126,17 @@ public class ViewTest implements Serializable {
 
     PCollection<Integer> output =
         pipeline.apply("Create123", Create.timestamped(
-                                        TimestampedValue.of(1, new Instant(4)),
-                                        TimestampedValue.of(2, new Instant(8)),
-                                        TimestampedValue.of(3, new Instant(12))))
+            TimestampedValue.of(1, new Instant(4)),
+            TimestampedValue.of(2, new Instant(8)),
+            TimestampedValue.of(3, new Instant(12))))
             .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
-            .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
-              @Override
-              public void processElement(ProcessContext c) {
-                c.output(c.sideInput(view));
-              }
-            }));
+            .apply("OutputSideInputs",
+                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    c.output(c.sideInput(view));
+                  }
+                }));
 
     PAssert.that(output).containsInAnyOrder(47, 47, 48);
 
@@ -150,7 +153,7 @@ public class ViewTest implements Serializable {
             .apply(View.<Integer>asSingleton());
 
     pipeline.apply("Create123", Create.of(1, 2, 3))
-        .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+        .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
           @Override
           public void processElement(ProcessContext c) {
             c.output(c.sideInput(view));
@@ -175,7 +178,7 @@ public class ViewTest implements Serializable {
     final PCollectionView<Integer> view = oneTwoThree.apply(View.<Integer>asSingleton());
 
     oneTwoThree.apply(
-        "OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+        "OutputSideInputs", ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
           @Override
           public void processElement(ProcessContext c) {
             c.output(c.sideInput(view));
@@ -201,16 +204,17 @@ public class ViewTest implements Serializable {
 
     PCollection<Integer> output =
         pipeline.apply("CreateMainInput", Create.of(29, 31))
-            .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
-              @Override
-              public void processElement(ProcessContext c) {
-                checkArgument(c.sideInput(view).size() == 4);
-                checkArgument(c.sideInput(view).get(0) == c.sideInput(view).get(0));
-                for (Integer i : c.sideInput(view)) {
-                  c.output(i);
-                }
-              }
-            }));
+            .apply("OutputSideInputs",
+                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    checkArgument(c.sideInput(view).size() == 4);
+                    checkArgument(c.sideInput(view).get(0) == c.sideInput(view).get(0));
+                    for (Integer i : c.sideInput(view)) {
+                      c.output(i);
+                    }
+                  }
+                }));
 
     PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 11, 13, 17, 23);
 
@@ -237,19 +241,21 @@ public class ViewTest implements Serializable {
 
     PCollection<Integer> output =
         pipeline.apply("CreateMainInput", Create.timestamped(
-                                              TimestampedValue.of(29, new Instant(1)),
-                                              TimestampedValue.of(35, new Instant(11))))
+            TimestampedValue.of(29, new Instant(1)),
+            TimestampedValue.of(35, new Instant(11))))
             .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
-            .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
-              @Override
-              public void processElement(ProcessContext c) {
-                checkArgument(c.sideInput(view).size() == 4);
-                checkArgument(c.sideInput(view).get(0) == c.sideInput(view).get(0));
-                for (Integer i : c.sideInput(view)) {
-                  c.output(i);
-                }
-              }
-            }));
+            .apply(
+                "OutputSideInputs",
+                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    checkArgument(c.sideInput(view).size() == 4);
+                    checkArgument(c.sideInput(view).get(0) == c.sideInput(view).get(0));
+                    for (Integer i : c.sideInput(view)) {
+                      c.output(i);
+                    }
+                  }
+                }));
 
     PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 31, 33, 37, 43);
 
@@ -267,16 +273,17 @@ public class ViewTest implements Serializable {
 
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
-            .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
-              @Override
-              public void processElement(ProcessContext c) {
-                assertTrue(c.sideInput(view).isEmpty());
-                assertFalse(c.sideInput(view).iterator().hasNext());
-                c.output(1);
-              }
-            }));
+            .apply("OutputSideInputs",
+                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    assertTrue(c.sideInput(view).isEmpty());
+                    assertFalse(c.sideInput(view).iterator().hasNext());
+                    c.output(1);
+                  }
+                }));
 
-    // Pass at least one value through to guarantee that DoFn executes.
+    // Pass at least one value through to guarantee that OldDoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
 
     pipeline.run();
@@ -292,36 +299,37 @@ public class ViewTest implements Serializable {
 
     PCollection<Integer> output =
         pipeline.apply("CreateMainInput", Create.of(29))
-            .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
-              @Override
-              public void processElement(ProcessContext c) {
-                try {
-                  c.sideInput(view).clear();
-                  fail("Expected UnsupportedOperationException on clear()");
-                } catch (UnsupportedOperationException expected) {
-                }
-                try {
-                  c.sideInput(view).add(4);
-                  fail("Expected UnsupportedOperationException on add()");
-                } catch (UnsupportedOperationException expected) {
-                }
-                try {
-                  c.sideInput(view).addAll(new ArrayList<Integer>());
-                  fail("Expected UnsupportedOperationException on addAll()");
-                } catch (UnsupportedOperationException expected) {
-                }
-                try {
-                  c.sideInput(view).remove(0);
-                  fail("Expected UnsupportedOperationException on remove()");
-                } catch (UnsupportedOperationException expected) {
-                }
-                for (Integer i : c.sideInput(view)) {
-                  c.output(i);
-                }
-              }
-            }));
+            .apply("OutputSideInputs",
+                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    try {
+                      c.sideInput(view).clear();
+                      fail("Expected UnsupportedOperationException on clear()");
+                    } catch (UnsupportedOperationException expected) {
+                    }
+                    try {
+                      c.sideInput(view).add(4);
+                      fail("Expected UnsupportedOperationException on add()");
+                    } catch (UnsupportedOperationException expected) {
+                    }
+                    try {
+                      c.sideInput(view).addAll(new ArrayList<Integer>());
+                      fail("Expected UnsupportedOperationException on addAll()");
+                    } catch (UnsupportedOperationException expected) {
+                    }
+                    try {
+                      c.sideInput(view).remove(0);
+                      fail("Expected UnsupportedOperationException on remove()");
+                    } catch (UnsupportedOperationException expected) {
+                    }
+                    for (Integer i : c.sideInput(view)) {
+                      c.output(i);
+                    }
+                  }
+                }));
 
-    // Pass at least one value through to guarantee that DoFn executes.
+    // Pass at least one value through to guarantee that OldDoFn executes.
     PAssert.that(output).containsInAnyOrder(11);
 
     pipeline.run();
@@ -338,14 +346,15 @@ public class ViewTest implements Serializable {
 
     PCollection<Integer> output =
         pipeline.apply("CreateMainInput", Create.of(29, 31))
-            .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
-              @Override
-              public void processElement(ProcessContext c) {
-                for (Integer i : c.sideInput(view)) {
-                  c.output(i);
-                }
-              }
-            }));
+            .apply("OutputSideInputs",
+                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    for (Integer i : c.sideInput(view)) {
+                      c.output(i);
+                    }
+                  }
+                }));
 
     PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 11, 13, 17, 23);
 
@@ -371,18 +380,21 @@ public class ViewTest implements Serializable {
             .apply(View.<Integer>asIterable());
 
     PCollection<Integer> output =
-        pipeline.apply("CreateMainInput", Create.timestamped(
-                                              TimestampedValue.of(29, new Instant(1)),
-                                              TimestampedValue.of(35, new Instant(11))))
+        pipeline
+            .apply("CreateMainInput",
+                Create.timestamped(
+                    TimestampedValue.of(29, new Instant(1)),
+                    TimestampedValue.of(35, new Instant(11))))
             .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
-            .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
-              @Override
-              public void processElement(ProcessContext c) {
-                for (Integer i : c.sideInput(view)) {
-                  c.output(i);
-                }
-              }
-            }));
+            .apply("OutputSideInputs",
+                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    for (Integer i : c.sideInput(view)) {
+                      c.output(i);
+                    }
+                  }
+                }));
 
     PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 31, 33, 37, 43);
 
@@ -400,15 +412,16 @@ public class ViewTest implements Serializable {
 
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
-            .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
-              @Override
-              public void processElement(ProcessContext c) {
-                assertFalse(c.sideInput(view).iterator().hasNext());
-                c.output(1);
-              }
-            }));
+            .apply("OutputSideInputs",
+                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    assertFalse(c.sideInput(view).iterator().hasNext());
+                    c.output(1);
+                  }
+                }));
 
-    // Pass at least one value through to guarantee that DoFn executes.
+    // Pass at least one value through to guarantee that OldDoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
 
     pipeline.run();
@@ -424,22 +437,23 @@ public class ViewTest implements Serializable {
 
     PCollection<Integer> output =
         pipeline.apply("CreateMainInput", Create.of(29))
-            .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
-              @Override
-              public void processElement(ProcessContext c) {
-                Iterator<Integer> iterator = c.sideInput(view).iterator();
-                while (iterator.hasNext()) {
-                  try {
-                    iterator.remove();
-                    fail("Expected UnsupportedOperationException on remove()");
-                  } catch (UnsupportedOperationException expected) {
+            .apply("OutputSideInputs",
+                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    Iterator<Integer> iterator = c.sideInput(view).iterator();
+                    while (iterator.hasNext()) {
+                      try {
+                        iterator.remove();
+                        fail("Expected UnsupportedOperationException on remove()");
+                      } catch (UnsupportedOperationException expected) {
+                      }
+                      c.output(iterator.next());
+                    }
                   }
-                  c.output(iterator.next());
-                }
-              }
-            }));
+                }));
 
-    // Pass at least one value through to guarantee that DoFn executes.
+    // Pass at least one value through to guarantee that OldDoFn executes.
     PAssert.that(output).containsInAnyOrder(11);
 
     pipeline.run();
@@ -458,7 +472,7 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
                   @Override
                   public void processElement(ProcessContext c) {
                     for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) {
@@ -486,7 +500,7 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of(2 /* size */))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<Integer, KV<String, Integer>>() {
+                ParDo.withSideInputs(view).of(new OldDoFn<Integer, KV<String, Integer>>() {
                   @Override
                   public void processElement(ProcessContext c) {
                     assertEquals((int) c.element(), c.sideInput(view).size());
@@ -540,7 +554,7 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
                   @Override
                   public void processElement(ProcessContext c) {
                     for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) {
@@ -577,7 +591,7 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of("blackberry", new Instant(16))))
             .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
-                                           new DoFn<String, KV<String, Integer>>() {
+                                           new OldDoFn<String, KV<String, Integer>>() {
                                              @Override
                                              public void processElement(ProcessContext c) {
                                                for (Integer v :
@@ -615,7 +629,7 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of(1 /* size */, new Instant(16))))
             .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
-                                           new DoFn<Integer, KV<String, Integer>>() {
+                                           new OldDoFn<Integer, KV<String, Integer>>() {
                                              @Override
                                              public void processElement(ProcessContext c) {
                                                assertEquals((int) c.element(),
@@ -660,7 +674,7 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of("blackberry", new Instant(16))))
             .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
-                                           new DoFn<String, KV<String, Integer>>() {
+                                           new OldDoFn<String, KV<String, Integer>>() {
                                              @Override
                                              public void processElement(ProcessContext c) {
                                                for (Integer v :
@@ -689,17 +703,18 @@ public class ViewTest implements Serializable {
 
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
-            .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
-              @Override
-              public void processElement(ProcessContext c) {
-                assertTrue(c.sideInput(view).isEmpty());
-                assertTrue(c.sideInput(view).entrySet().isEmpty());
-                assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
-                c.output(c.element());
-              }
-            }));
+            .apply("OutputSideInputs",
+                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    assertTrue(c.sideInput(view).isEmpty());
+                    assertTrue(c.sideInput(view).entrySet().isEmpty());
+                    assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
+                    c.output(c.element());
+                  }
+                }));
 
-    // Pass at least one value through to guarantee that DoFn executes.
+    // Pass at least one value through to guarantee that OldDoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
 
     pipeline.run();
@@ -718,17 +733,18 @@ public class ViewTest implements Serializable {
 
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
-            .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
-              @Override
-              public void processElement(ProcessContext c) {
-                assertTrue(c.sideInput(view).isEmpty());
-                assertTrue(c.sideInput(view).entrySet().isEmpty());
-                assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
-                c.output(c.element());
-              }
-            }));
+            .apply("OutputSideInputs",
+                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    assertTrue(c.sideInput(view).isEmpty());
+                    assertTrue(c.sideInput(view).entrySet().isEmpty());
+                    assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
+                    c.output(c.element());
+                  }
+                }));
 
-    // Pass at least one value through to guarantee that DoFn executes.
+    // Pass at least one value through to guarantee that OldDoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
 
     pipeline.run();
@@ -747,7 +763,7 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
                   @Override
                   public void processElement(ProcessContext c) {
                     try {
@@ -776,7 +792,7 @@ public class ViewTest implements Serializable {
                   }
                 }));
 
-    // Pass at least one value through to guarantee that DoFn executes.
+    // Pass at least one value through to guarantee that OldDoFn executes.
     PAssert.that(output).containsInAnyOrder(KV.of("apple", 1));
 
     pipeline.run();
@@ -795,7 +811,7 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
                   @Override
                   public void processElement(ProcessContext c) {
                     c.output(
@@ -822,7 +838,7 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of(2 /* size */))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<Integer, KV<String, Integer>>() {
+                ParDo.withSideInputs(view).of(new OldDoFn<Integer, KV<String, Integer>>() {
                   @Override
                   public void processElement(ProcessContext c) {
                     assertEquals((int) c.element(), c.sideInput(view).size());
@@ -854,7 +870,7 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
                   @Override
                   public void processElement(ProcessContext c) {
                     c.output(
@@ -890,7 +906,7 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of("blackberry", new Instant(16))))
             .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
-                                           new DoFn<String, KV<String, Integer>>() {
+                                           new OldDoFn<String, KV<String, Integer>>() {
                                              @Override
                                              public void processElement(ProcessContext c) {
                                                c.output(KV.of(
@@ -927,7 +943,7 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of(1 /* size */, new Instant(16))))
             .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
-                                           new DoFn<Integer, KV<String, Integer>>() {
+                                           new OldDoFn<Integer, KV<String, Integer>>() {
                                              @Override
                                              public void processElement(ProcessContext c) {
                                                assertEquals((int) c.element(),
@@ -972,7 +988,7 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of("blackberry", new Instant(16))))
             .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
-                                           new DoFn<String, KV<String, Integer>>() {
+                                           new OldDoFn<String, KV<String, Integer>>() {
                                              @Override
                                              public void processElement(ProcessContext c) {
                                                c.output(KV.of(
@@ -1000,17 +1016,18 @@ public class ViewTest implements Serializable {
 
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
-            .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
-              @Override
-              public void processElement(ProcessContext c) {
-                assertTrue(c.sideInput(view).isEmpty());
-                assertTrue(c.sideInput(view).entrySet().isEmpty());
-                assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
-                c.output(c.element());
-              }
-            }));
+            .apply("OutputSideInputs",
+                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    assertTrue(c.sideInput(view).isEmpty());
+                    assertTrue(c.sideInput(view).entrySet().isEmpty());
+                    assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
+                    c.output(c.element());
+                  }
+                }));
 
-    // Pass at least one value through to guarantee that DoFn executes.
+    // Pass at least one value through to guarantee that OldDoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
 
     pipeline.run();
@@ -1028,17 +1045,18 @@ public class ViewTest implements Serializable {
 
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
-            .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
-              @Override
-              public void processElement(ProcessContext c) {
-                assertTrue(c.sideInput(view).isEmpty());
-                assertTrue(c.sideInput(view).entrySet().isEmpty());
-                assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
-                c.output(c.element());
-              }
-            }));
+            .apply("OutputSideInputs",
+                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    assertTrue(c.sideInput(view).isEmpty());
+                    assertTrue(c.sideInput(view).entrySet().isEmpty());
+                    assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
+                    c.output(c.element());
+                  }
+                }));
 
-    // Pass at least one value through to guarantee that DoFn executes.
+    // Pass at least one value through to guarantee that OldDoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
 
     pipeline.run();
@@ -1062,7 +1080,7 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
                   @Override
                   public void processElement(ProcessContext c) {
                     c.output(
@@ -1093,7 +1111,7 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
                   @Override
                   public void processElement(ProcessContext c) {
                     try {
@@ -1121,7 +1139,7 @@ public class ViewTest implements Serializable {
                   }
                 }));
 
-    // Pass at least one value through to guarantee that DoFn executes.
+    // Pass at least one value through to guarantee that OldDoFn executes.
     PAssert.that(output).containsInAnyOrder(KV.of("apple", 1));
 
     pipeline.run();
@@ -1139,12 +1157,14 @@ public class ViewTest implements Serializable {
 
     PCollection<KV<String, Integer>> output =
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
-            .apply("Output", ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
-              @Override
-              public void processElement(ProcessContext c) {
-                c.output(KV.of(c.element(), c.sideInput(view).get(c.element().substring(0, 1))));
-              }
-            }));
+            .apply("Output",
+                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    c.output(KV
+                        .of(c.element(), c.sideInput(view).get(c.element().substring(0, 1))));
+                  }
+                }));
 
     PAssert.that(output).containsInAnyOrder(
         KV.of("apple", 21), KV.of("banana", 3), KV.of("blackberry", 3));
@@ -1173,7 +1193,7 @@ public class ViewTest implements Serializable {
                                        TimestampedValue.of("C", new Instant(7))))
             .apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of(
-                                                  new DoFn<String, String>() {
+                                                  new OldDoFn<String, String>() {
                                                     @Override
                                                     public void processElement(ProcessContext c) {
                                                       c.output(c.element() + c.sideInput(view));
@@ -1206,7 +1226,7 @@ public class ViewTest implements Serializable {
                                        TimestampedValue.of("C", new Instant(7))))
             .apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of(
-                                                  new DoFn<String, String>() {
+                                                  new OldDoFn<String, String>() {
                                                     @Override
                                                     public void processElement(ProcessContext c) {
                                                       c.output(c.element() + c.sideInput(view));
@@ -1237,7 +1257,7 @@ public class ViewTest implements Serializable {
                                        TimestampedValue.of("C", new Instant(7))))
             .apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of(
-                                                  new DoFn<String, String>() {
+                                                  new OldDoFn<String, String>() {
                                                     @Override
                                                     public void processElement(ProcessContext c) {
                                                       c.output(c.element() + c.sideInput(view));
@@ -1267,7 +1287,7 @@ public class ViewTest implements Serializable {
         p.apply("CreateMainInput", Create.of(""))
             .apply(
                 "OutputMainAndSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<String, String>() {
+                ParDo.withSideInputs(view).of(new OldDoFn<String, String>() {
                   @Override
                   public void processElement(ProcessContext c) {
                     c.output(c.element() + c.sideInput(view));
@@ -1285,7 +1305,7 @@ public class ViewTest implements Serializable {
     Pipeline pipeline = TestPipeline.create();
     final PCollectionView<Iterable<Integer>> view1 =
         pipeline.apply("CreateVoid1", Create.of((Void) null).withCoder(VoidCoder.of()))
-            .apply("OutputOneInteger", ParDo.of(new DoFn<Void, Integer>() {
+            .apply("OutputOneInteger", ParDo.of(new OldDoFn<Void, Integer>() {
               @Override
               public void processElement(ProcessContext c) {
                 c.output(17);
@@ -1297,7 +1317,7 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateVoid2", Create.of((Void) null).withCoder(VoidCoder.of()))
             .apply(
                 "OutputSideInput",
-                ParDo.withSideInputs(view1).of(new DoFn<Void, Iterable<Integer>>() {
+                ParDo.withSideInputs(view1).of(new OldDoFn<Void, Iterable<Integer>>() {
                   @Override
                   public void processElement(ProcessContext c) {
                     c.output(c.sideInput(view1));
@@ -1307,8 +1327,8 @@ public class ViewTest implements Serializable {
 
     PCollection<Integer> output =
         pipeline.apply("CreateVoid3", Create.of((Void) null).withCoder(VoidCoder.of()))
-            .apply(
-                "ReadIterableSideInput", ParDo.withSideInputs(view2).of(new DoFn<Void, Integer>() {
+            .apply("ReadIterableSideInput",
+                ParDo.withSideInputs(view2).of(new OldDoFn<Void, Integer>() {
                   @Override
                   public void processElement(ProcessContext c) {
                     for (Iterable<Integer> input : c.sideInput(view2)) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
index ac67bb4..d2ba452 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
@@ -65,9 +65,9 @@ public class WithTimestampsTest implements Serializable {
          .apply(WithTimestamps.of(timestampFn));
 
     PCollection<KV<String, Instant>> timestampedVals =
-        timestamped.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() {
+        timestamped.apply(ParDo.of(new OldDoFn<String, KV<String, Instant>>() {
           @Override
-          public void processElement(DoFn<String, KV<String, Instant>>.ProcessContext c)
+          public void processElement(OldDoFn<String, KV<String, Instant>>.ProcessContext c)
               throws Exception {
             c.output(KV.of(c.element(), c.timestamp()));
           }
@@ -150,9 +150,9 @@ public class WithTimestampsTest implements Serializable {
              WithTimestamps.of(backInTimeFn).withAllowedTimestampSkew(skew.plus(100L)));
 
     PCollection<KV<String, Instant>> timestampedVals =
-        timestampedWithSkew.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() {
+        timestampedWithSkew.apply(ParDo.of(new OldDoFn<String, KV<String, Instant>>() {
           @Override
-          public void processElement(DoFn<String, KV<String, Instant>>.ProcessContext c)
+          public void processElement(OldDoFn<String, KV<String, Instant>>.ProcessContext c)
               throws Exception {
             c.output(KV.of(c.element(), c.timestamp()));
           }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
index ce32b7d..c1848c6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
@@ -24,7 +24,7 @@ import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PBegin;
@@ -50,7 +50,7 @@ public class DisplayDataEvaluatorTest implements Serializable {
         new PTransform<PCollection<String>, POutput> () {
           @Override
           public PCollection<String> apply(PCollection<String> input) {
-            return input.apply(ParDo.of(new DoFn<String, String>() {
+            return input.apply(ParDo.of(new OldDoFn<String, String>() {
               @Override
               public void processElement(ProcessContext c) throws Exception {
                 c.output(c.element());
@@ -79,7 +79,7 @@ public class DisplayDataEvaluatorTest implements Serializable {
   @Test
   public void testPrimitiveTransform() {
     PTransform<? super PCollection<Integer>, ? super PCollection<Integer>> myTransform = ParDo.of(
-        new DoFn<Integer, Integer>() {
+        new OldDoFn<Integer, Integer>() {
       @Override
       public void processElement(ProcessContext c) throws Exception {}
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
index 07029e9..fa44390 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
@@ -22,6 +22,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasName
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;