You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/08 17:47:00 UTC

[1/3] incubator-beam git commit: This closes #1538

Repository: incubator-beam
Updated Branches:
  refs/heads/master 04a41ee54 -> 692905705


This closes #1538


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/69290570
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/69290570
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/69290570

Branch: refs/heads/master
Commit: 692905705fee3666bdbf85e67e5a037244d668b8
Parents: 04a41ee 4d607b5
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Dec 8 09:35:36 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:35:36 2016 -0800

----------------------------------------------------------------------
 .../org/apache/beam/examples/WordCount.java     |  2 +-
 .../beam/examples/complete/AutoComplete.java    |  6 +--
 .../apache/beam/examples/complete/TfIdf.java    |  6 +--
 .../examples/complete/TopWikipediaSessions.java |  6 +--
 .../examples/complete/TrafficMaxLaneFlow.java   |  4 +-
 .../beam/examples/complete/TrafficRoutes.java   |  4 +-
 .../examples/cookbook/BigQueryTornadoes.java    |  2 +-
 .../cookbook/CombinePerKeyExamples.java         |  2 +-
 .../beam/examples/cookbook/FilterExamples.java  |  2 +-
 .../examples/cookbook/MaxPerKeyExamples.java    |  2 +-
 .../beam/examples/cookbook/TriggerExample.java  |  4 +-
 .../examples/complete/AutoCompleteTest.java     |  2 +-
 .../beam/examples/complete/game/GameStats.java  |  2 +-
 .../examples/complete/game/LeaderBoard.java     |  4 +-
 .../beam/examples/complete/game/UserScore.java  |  2 +-
 .../complete/game/utils/WriteToBigQuery.java    |  2 +-
 .../game/utils/WriteWindowedToBigQuery.java     |  2 +-
 .../apache/beam/runners/apex/ApexRunner.java    | 10 ++--
 .../apache/beam/runners/core/AssignWindows.java |  2 +-
 .../core/GroupByKeyViaGroupByKeyOnly.java       |  8 +--
 .../beam/runners/core/SplittableParDo.java      |  6 +--
 .../core/UnboundedReadFromBoundedSource.java    |  2 +-
 .../beam/runners/direct/DirectGroupByKey.java   |  6 +--
 .../runners/direct/ForwardingPTransform.java    |  6 +--
 .../direct/ParDoMultiOverrideFactory.java       |  4 +-
 .../ParDoSingleViaMultiOverrideFactory.java     |  2 +-
 .../direct/TestStreamEvaluatorFactory.java      |  2 +-
 .../runners/direct/ViewEvaluatorFactory.java    |  4 +-
 .../direct/WriteWithShardingFactory.java        |  4 +-
 .../runners/direct/CommittedResultTest.java     |  2 +-
 .../runners/direct/DirectGraphVisitorTest.java  |  2 +-
 .../direct/ForwardingPTransformTest.java        |  4 +-
 .../direct/KeyedPValueTrackingVisitorTest.java  |  4 +-
 .../beam/runners/flink/examples/TFIDF.java      |  6 +--
 .../beam/runners/flink/examples/WordCount.java  |  2 +-
 .../flink/examples/streaming/AutoComplete.java  |  6 +--
 .../apache/beam/runners/flink/FlinkRunner.java  | 14 +++---
 .../beam/runners/dataflow/DataflowRunner.java   | 51 ++++++++++----------
 .../dataflow/internal/AssignWindows.java        |  2 +-
 .../DataflowUnboundedReadFromBoundedSource.java |  2 +-
 .../DataflowPipelineTranslatorTest.java         |  8 +--
 .../runners/dataflow/DataflowRunnerTest.java    |  2 +-
 .../transforms/DataflowGroupByKeyTest.java      |  2 +-
 .../dataflow/transforms/DataflowViewTest.java   |  2 +-
 .../beam/runners/spark/examples/WordCount.java  |  2 +-
 .../apache/beam/runners/spark/io/ConsoleIO.java |  2 +-
 .../beam/runners/spark/io/CreateStream.java     |  2 +-
 .../beam/runners/spark/io/hadoop/HadoopIO.java  |  4 +-
 .../translation/StorageLevelPTransform.java     |  2 +-
 .../util/SinglePrimitiveOutputPTransform.java   |  2 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     |  4 +-
 .../sdk/io/BoundedReadFromUnboundedSource.java  |  2 +-
 .../org/apache/beam/sdk/io/CountingInput.java   |  4 +-
 .../java/org/apache/beam/sdk/io/PubsubIO.java   |  4 +-
 .../apache/beam/sdk/io/PubsubUnboundedSink.java |  2 +-
 .../beam/sdk/io/PubsubUnboundedSource.java      |  2 +-
 .../main/java/org/apache/beam/sdk/io/Read.java  |  4 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     |  4 +-
 .../main/java/org/apache/beam/sdk/io/Write.java |  2 +-
 .../apache/beam/sdk/runners/PipelineRunner.java |  2 +-
 .../apache/beam/sdk/testing/GatherAllPanes.java |  2 +-
 .../org/apache/beam/sdk/testing/PAssert.java    | 12 ++---
 .../org/apache/beam/sdk/testing/TestStream.java |  2 +-
 .../beam/sdk/transforms/ApproximateUnique.java  |  4 +-
 .../org/apache/beam/sdk/transforms/Combine.java | 10 ++--
 .../org/apache/beam/sdk/transforms/Count.java   |  2 +-
 .../org/apache/beam/sdk/transforms/Create.java  |  4 +-
 .../apache/beam/sdk/transforms/Distinct.java    |  4 +-
 .../org/apache/beam/sdk/transforms/Filter.java  |  2 +-
 .../beam/sdk/transforms/FlatMapElements.java    |  2 +-
 .../org/apache/beam/sdk/transforms/Flatten.java |  4 +-
 .../apache/beam/sdk/transforms/GroupByKey.java  |  2 +-
 .../org/apache/beam/sdk/transforms/Keys.java    |  2 +-
 .../org/apache/beam/sdk/transforms/KvSwap.java  |  2 +-
 .../org/apache/beam/sdk/transforms/Latest.java  |  4 +-
 .../apache/beam/sdk/transforms/MapElements.java |  2 +-
 .../apache/beam/sdk/transforms/PTransform.java  |  4 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |  4 +-
 .../apache/beam/sdk/transforms/Partition.java   |  2 +-
 .../org/apache/beam/sdk/transforms/Regex.java   | 14 +++---
 .../org/apache/beam/sdk/transforms/Sample.java  |  2 +-
 .../org/apache/beam/sdk/transforms/Values.java  |  2 +-
 .../org/apache/beam/sdk/transforms/View.java    | 12 ++---
 .../apache/beam/sdk/transforms/WithKeys.java    |  2 +-
 .../beam/sdk/transforms/WithTimestamps.java     |  2 +-
 .../beam/sdk/transforms/join/CoGroupByKey.java  |  2 +-
 .../beam/sdk/transforms/windowing/Window.java   |  4 +-
 .../org/apache/beam/sdk/util/Reshuffle.java     |  2 +-
 .../java/org/apache/beam/sdk/PipelineTest.java  |  6 +--
 .../beam/sdk/coders/CoderRegistryTest.java      |  4 +-
 .../java/org/apache/beam/sdk/io/WriteTest.java  |  2 +-
 .../sdk/runners/TransformHierarchyTest.java     |  4 +-
 .../beam/sdk/runners/TransformTreeTest.java     |  4 +-
 .../sdk/transforms/FlatMapElementsTest.java     |  2 +-
 .../beam/sdk/transforms/GroupByKeyTest.java     |  2 +-
 .../beam/sdk/transforms/MapElementsTest.java    |  2 +-
 .../beam/sdk/transforms/PTransformTest.java     |  2 +-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  4 +-
 .../apache/beam/sdk/transforms/ViewTest.java    |  2 +-
 .../display/DisplayDataEvaluatorTest.java       |  2 +-
 .../display/DisplayDataMatchersTest.java        |  6 +--
 .../sdk/transforms/display/DisplayDataTest.java |  2 +-
 .../sdk/transforms/windowing/WindowingTest.java |  2 +-
 .../apache/beam/sdk/util/StringUtilsTest.java   |  6 +--
 .../org/apache/beam/sdk/values/PDoneTest.java   |  4 +-
 .../beam/sdk/extensions/sorter/SortValues.java  |  2 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  8 +--
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    |  4 +-
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  |  4 +-
 .../org/apache/beam/sdk/io/jdbc/JdbcIO.java     |  4 +-
 .../java/org/apache/beam/sdk/io/jms/JmsIO.java  |  4 +-
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 10 ++--
 .../beam/sdk/io/mongodb/MongoDbGridFSIO.java    |  4 +-
 .../apache/beam/sdk/io/mongodb/MongoDbIO.java   |  4 +-
 114 files changed, 238 insertions(+), 237 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-beam git commit: Rename PTransform.apply to PTransform.expand

Posted by ke...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 9768788..9a6b534 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -700,7 +700,7 @@ public class PubsubIO {
       }
 
       @Override
-      public PCollection<T> apply(PBegin input) {
+      public PCollection<T> expand(PBegin input) {
         if (topic == null && subscription == null) {
           throw new IllegalStateException("Need to set either the topic or the subscription for "
               + "a PubsubIO.Read transform");
@@ -1057,7 +1057,7 @@ public class PubsubIO {
       }
 
       @Override
-      public PDone apply(PCollection<T> input) {
+      public PDone expand(PCollection<T> input) {
         if (topic == null) {
           throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
         }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index 1a86a1c..1992cb8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -424,7 +424,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
   }
 
   @Override
-  public PDone apply(PCollection<T> input) {
+  public PDone expand(PCollection<T> input) {
     input.apply("PubsubUnboundedSink.Window", Window.<T>into(new GlobalWindows())
         .triggering(
             Repeatedly.forever(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index cea74bc..da3b437 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -1339,7 +1339,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
   }
 
   @Override
-  public PCollection<T> apply(PBegin input) {
+  public PCollection<T> expand(PBegin input) {
     return input.getPipeline().begin()
                 .apply(Read.from(new PubsubSource<T>(this)))
                 .apply("PubsubUnboundedSource.Stats",

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index f04fbaf..7ec3b0e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -101,7 +101,7 @@ public class Read {
     }
 
     @Override
-    public final PCollection<T> apply(PBegin input) {
+    public final PCollection<T> expand(PBegin input) {
       source.validate();
 
       return PCollection.<T>createPrimitiveOutputInternal(input.getPipeline(),
@@ -169,7 +169,7 @@ public class Read {
     }
 
     @Override
-    public final PCollection<T> apply(PBegin input) {
+    public final PCollection<T> expand(PBegin input) {
       source.validate();
 
       return PCollection.<T>createPrimitiveOutputInternal(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index e967a27..54e73d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -292,7 +292,7 @@ public class TextIO {
       }
 
       @Override
-      public PCollection<T> apply(PBegin input) {
+      public PCollection<T> expand(PBegin input) {
         if (filepattern == null) {
           throw new IllegalStateException("need to set the filepattern of a TextIO.Read transform");
         }
@@ -742,7 +742,7 @@ public class TextIO {
       }
 
       @Override
-      public PDone apply(PCollection<T> input) {
+      public PDone expand(PCollection<T> input) {
         if (filenamePrefix == null) {
           throw new IllegalStateException(
               "need to set the filename prefix of a TextIO.Write transform");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index 7559fca..bc651d8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -104,7 +104,7 @@ public class Write {
     }
 
     @Override
-    public PDone apply(PCollection<T> input) {
+    public PDone expand(PCollection<T> input) {
       checkArgument(IsBounded.BOUNDED == input.isBounded(),
           "%s can only be applied to a Bounded PCollection",
           Write.class.getSimpleName());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
index 77f5128..8604dbc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
@@ -73,6 +73,6 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> {
    */
   public <OutputT extends POutput, InputT extends PInput> OutputT apply(
       PTransform<InputT, OutputT> transform, InputT input) {
-    return transform.apply(input);
+    return transform.expand(input);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
index 2b311b7..bf2cd0b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
@@ -55,7 +55,7 @@ class GatherAllPanes<T>
   private GatherAllPanes() {}
 
   @Override
-  public PCollection<Iterable<ValueInSingleWindow<T>>> apply(PCollection<T> input) {
+  public PCollection<Iterable<ValueInSingleWindow<T>>> expand(PCollection<T> input) {
     WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn();
 
     return input

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index 7dc78d8..b23f4f3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -763,7 +763,7 @@ public class PAssert {
     }
 
     @Override
-    public PCollectionView<ActualT> apply(PBegin input) {
+    public PCollectionView<ActualT> expand(PBegin input) {
       final Coder<T> coder = actual.getCoder();
       return actual
           .apply("FilterActuals", rewindowActuals.<T>prepareActuals())
@@ -833,7 +833,7 @@ public class PAssert {
     }
 
     @Override
-    public PCollection<Iterable<ValueInSingleWindow<T>>> apply(PCollection<T> input) {
+    public PCollection<Iterable<ValueInSingleWindow<T>>> expand(PCollection<T> input) {
       final int combinedKey = 42;
 
       // Remove the triggering on both
@@ -925,7 +925,7 @@ public class PAssert {
     }
 
     @Override
-    public PDone apply(PCollection<T> input) {
+    public PDone expand(PCollection<T> input) {
       input
           .apply("GroupGlobally", new GroupGlobally<T>(rewindowingStrategy))
           .apply("GetPane", MapElements.via(paneExtractor))
@@ -958,7 +958,7 @@ public class PAssert {
     }
 
     @Override
-    public PDone apply(PCollection<Iterable<T>> input) {
+    public PDone expand(PCollection<Iterable<T>> input) {
       input
           .apply("GroupGlobally", new GroupGlobally<Iterable<T>>(rewindowingStrategy))
           .apply("GetPane", MapElements.via(paneExtractor))
@@ -995,7 +995,7 @@ public class PAssert {
     }
 
     @Override
-    public PDone apply(PBegin input) {
+    public PDone expand(PBegin input) {
       final PCollectionView<ActualT> actual = input.apply("CreateActual", createActual);
 
       input
@@ -1321,7 +1321,7 @@ public class PAssert {
     }
 
     @Override
-    public PCollection<T> apply(PCollection<T> input) {
+    public PCollection<T> expand(PCollection<T> input) {
       return input.apply("FilterWindows", ParDo.of(new Fn()));
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
index 509bb24..da93cdc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
@@ -252,7 +252,7 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
   }
 
   @Override
-  public PCollection<T> apply(PBegin input) {
+  public PCollection<T> expand(PBegin input) {
     throw new IllegalStateException(
         String.format(
             "Pipeline Runner %s does not provide a required override for %s",

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
index 71c2158..33820e0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
@@ -204,7 +204,7 @@ public class ApproximateUnique {
     }
 
     @Override
-    public PCollection<Long> apply(PCollection<T> input) {
+    public PCollection<Long> expand(PCollection<T> input) {
       Coder<T> coder = input.getCoder();
       return input.apply(
           Combine.globally(
@@ -271,7 +271,7 @@ public class ApproximateUnique {
     }
 
     @Override
-    public PCollection<KV<K, Long>> apply(PCollection<KV<K, V>> input) {
+    public PCollection<KV<K, Long>> expand(PCollection<KV<K, V>> input) {
       Coder<KV<K, V>> inputCoder = input.getCoder();
       if (!(inputCoder instanceof KvCoder)) {
         throw new IllegalStateException(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 4127d94..3b07260 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -1431,7 +1431,7 @@ public class Combine {
     }
 
     @Override
-    public PCollection<OutputT> apply(PCollection<InputT> input) {
+    public PCollection<OutputT> expand(PCollection<InputT> input) {
       PCollection<KV<Void, InputT>> withKeys = input
           .apply(WithKeys.<Void, InputT>of((Void) null))
           .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()));
@@ -1569,7 +1569,7 @@ public class Combine {
     }
 
     @Override
-    public PCollectionView<OutputT> apply(PCollection<InputT> input) {
+    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
       Globally<InputT, OutputT> combineGlobally =
           Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout);
       if (insertDefault) {
@@ -1866,7 +1866,7 @@ public class Combine {
     }
 
     @Override
-    public PCollection<KV<K, OutputT>> apply(PCollection<KV<K, InputT>> input) {
+    public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) {
       return input
           .apply(GroupByKey.<K, InputT>create(fewKeys))
           .apply(Combine.<K, InputT, OutputT>groupedValues(fn, fnDisplayData)
@@ -1901,7 +1901,7 @@ public class Combine {
     }
 
     @Override
-    public PCollection<KV<K, OutputT>> apply(PCollection<KV<K, InputT>> input) {
+    public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) {
       return applyHelper(input);
     }
 
@@ -2388,7 +2388,7 @@ public class Combine {
     }
 
     @Override
-    public PCollection<KV<K, OutputT>> apply(
+    public PCollection<KV<K, OutputT>> expand(
         PCollection<? extends KV<K, ? extends Iterable<InputT>>> input) {
 
       PCollection<KV<K, OutputT>> output = input.apply(ParDo.of(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index b393a30..9101996 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -103,7 +103,7 @@ public class Count {
     public PerElement() { }
 
     @Override
-    public PCollection<KV<T, Long>> apply(PCollection<T> input) {
+    public PCollection<KV<T, Long>> expand(PCollection<T> input) {
       return
           input
           .apply("Init", MapElements.via(new SimpleFunction<T, KV<T, Void>>() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 7cd4711..a48136f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -240,7 +240,7 @@ public class Create<T> {
     }
 
     @Override
-    public PCollection<T> apply(PBegin input) {
+    public PCollection<T> expand(PBegin input) {
       try {
         Coder<T> coder = getDefaultOutputCoder(input);
         try {
@@ -440,7 +440,7 @@ public class Create<T> {
     }
 
     @Override
-    public PCollection<T> apply(PBegin input) {
+    public PCollection<T> expand(PBegin input) {
       try {
         Iterable<T> rawElements =
             Iterables.transform(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
index fba428b..2d08cee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
@@ -82,7 +82,7 @@ public class Distinct<T> extends PTransform<PCollection<T>,
   }
 
   @Override
-  public PCollection<T> apply(PCollection<T> in) {
+  public PCollection<T> expand(PCollection<T> in) {
     return in
         .apply("CreateIndex", MapElements.via(new SimpleFunction<T, KV<T, Void>>() {
           @Override
@@ -121,7 +121,7 @@ public class Distinct<T> extends PTransform<PCollection<T>,
     }
 
     @Override
-    public PCollection<T> apply(PCollection<T> in) {
+    public PCollection<T> expand(PCollection<T> in) {
       WithKeys<IdT, T> withKeys = WithKeys.of(fn);
       if (representativeType != null) {
         withKeys = withKeys.withKeyType(representativeType);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
index 2d9bdee..a564999 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
@@ -201,7 +201,7 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
   }
 
   @Override
-  public PCollection<T> apply(PCollection<T> input) {
+  public PCollection<T> expand(PCollection<T> input) {
     return input.apply(ParDo.of(new DoFn<T, T>() {
       @ProcessElement
       public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index 4ef809f..c165f7f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -129,7 +129,7 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
   }
 
   @Override
-  public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
+  public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
     return input.apply(
         "FlatMap",
         ParDo.of(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index e612836..3ef2e55 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -105,7 +105,7 @@ public class Flatten {
     private FlattenPCollectionList() { }
 
     @Override
-    public PCollection<T> apply(PCollectionList<T> inputs) {
+    public PCollection<T> expand(PCollectionList<T> inputs) {
       WindowingStrategy<?, ?> windowingStrategy;
       IsBounded isBounded = IsBounded.BOUNDED;
       if (!inputs.getAll().isEmpty()) {
@@ -163,7 +163,7 @@ public class Flatten {
       extends PTransform<PCollection<? extends Iterable<T>>, PCollection<T>> {
 
     @Override
-    public PCollection<T> apply(PCollection<? extends Iterable<T>> in) {
+    public PCollection<T> expand(PCollection<? extends Iterable<T>> in) {
       Coder<? extends Iterable<T>> inCoder = in.getCoder();
       if (!(inCoder instanceof IterableLikeCoder)) {
         throw new IllegalArgumentException(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index 1faac59..a339af7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -216,7 +216,7 @@ public class GroupByKey<K, V>
   }
 
   @Override
-  public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+  public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
     // This primitive operation groups by the combination of key and window,
     // merging windows as needed, using the windows assigned to the
     // key/value input elements and the window merge operation of the

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
index 2405adf..c6f307d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
@@ -56,7 +56,7 @@ public class Keys<K> extends PTransform<PCollection<? extends KV<K, ?>>,
   private Keys() { }
 
   @Override
-  public PCollection<K> apply(PCollection<? extends KV<K, ?>> in) {
+  public PCollection<K> expand(PCollection<? extends KV<K, ?>> in) {
     return
         in.apply("Keys", MapElements.via(new SimpleFunction<KV<K, ?>, K>() {
           @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
index 2b81ebf..dbe262b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
@@ -60,7 +60,7 @@ public class KvSwap<K, V> extends PTransform<PCollection<KV<K, V>>,
   private KvSwap() { }
 
   @Override
-  public PCollection<KV<V, K>> apply(PCollection<KV<K, V>> in) {
+  public PCollection<KV<V, K>> expand(PCollection<KV<K, V>> in) {
     return
         in.apply("KvSwap", MapElements.via(new SimpleFunction<KV<K, V>, KV<V, K>>() {
           @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
index 83cceca..9c2d715 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
@@ -159,7 +159,7 @@ public class Latest {
 
   private static class Globally<T> extends PTransform<PCollection<T>, PCollection<T>> {
     @Override
-    public PCollection<T> apply(PCollection<T> input) {
+    public PCollection<T> expand(PCollection<T> input) {
       Coder<T> inputCoder = input.getCoder();
 
       return input
@@ -178,7 +178,7 @@ public class Latest {
   private static class PerKey<K, V>
       extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
     @Override
-    public PCollection<KV<K, V>> apply(PCollection<KV<K, V>> input) {
+    public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
       checkNotNull(input);
       checkArgument(input.getCoder() instanceof KvCoder,
           "Input specifiedCoder must be an instance of KvCoder, but was %s", input.getCoder());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index c109034..421b2ab 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -111,7 +111,7 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
   }
 
   @Override
-  public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
+  public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
     return input.apply(
         "Map",
         ParDo.of(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
index 83fe577..ce4891d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
@@ -129,7 +129,7 @@ import org.apache.beam.sdk.values.TypedPValue;
  * output value as their apply implementation.
  * The majority of PTransforms are
  * implemented as composites of other PTransforms.  Such a PTransform
- * subclass typically just implements {@link #apply}, computing its
+ * subclass typically just implements {@link #expand}, computing its
  * Output value from its {@code InputT} value.  User programs are encouraged to
  * use this mechanism to modularize their own code.  Such composite
  * abstractions get their own name, and navigating through the
@@ -181,7 +181,7 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
    * a new unbound output and register evaluators (via backend-specific
    * registration methods).
    */
-  public abstract OutputT apply(InputT input);
+  public abstract OutputT expand(InputT input);
 
   /**
    * Called before invoking apply (which may be intercepted by the runner) to

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index ba6e644..4f7491e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -795,7 +795,7 @@ public class ParDo {
     }
 
     @Override
-    public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
+    public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
       checkArgument(
           !isSplittable(getOldFn()), "Splittable DoFn not supported by the current runner");
       validateWindowType(input, fn);
@@ -1052,7 +1052,7 @@ public class ParDo {
 
 
     @Override
-    public PCollectionTuple apply(PCollection<? extends InputT> input) {
+    public PCollectionTuple expand(PCollection<? extends InputT> input) {
       checkArgument(
           !isSplittable(getOldFn()), "Splittable DoFn not supported by the current runner");
       validateWindowType(input, fn);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
index b525872..e0b2b61 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
@@ -100,7 +100,7 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>>
   /////////////////////////////////////////////////////////////////////////////
 
   @Override
-  public PCollectionList<T> apply(PCollection<T> in) {
+  public PCollectionList<T> expand(PCollection<T> in) {
     final TupleTagList outputTags = partitionDoFn.getOutputTags();
 
     PCollectionTuple outputs = in.apply(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java
index a94130d..14c5d1b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java
@@ -179,7 +179,7 @@ public class Regex {
       this.group = group;
     }
 
-    public PCollection<String> apply(PCollection<String> in) {
+    public PCollection<String> expand(PCollection<String> in) {
       return in.apply(
           ParDo.of(
               new DoFn<String, String>() {
@@ -224,7 +224,7 @@ public class Regex {
       this.valueGroup = valueGroup;
     }
 
-    public PCollection<KV<String, String>> apply(PCollection<String> in) {
+    public PCollection<KV<String, String>> expand(PCollection<String> in) {
       return in.apply(
           ParDo.of(
               new DoFn<String, KV<String, String>>() {
@@ -266,7 +266,7 @@ public class Regex {
       this.group = group;
     }
 
-    public PCollection<String> apply(PCollection<String> in) {
+    public PCollection<String> expand(PCollection<String> in) {
       return in.apply(
           ParDo.of(
               new DoFn<String, String>() {
@@ -312,7 +312,7 @@ public class Regex {
       this.valueGroup = valueGroup;
     }
 
-    public PCollection<KV<String, String>> apply(PCollection<String> in) {
+    public PCollection<KV<String, String>> expand(PCollection<String> in) {
       return in.apply(
           ParDo.of(
               new DoFn<String, KV<String, String>>() {
@@ -354,7 +354,7 @@ public class Regex {
       this.replacement = replacement;
     }
 
-    public PCollection<String> apply(PCollection<String> in) {
+    public PCollection<String> expand(PCollection<String> in) {
       return in.apply(
           ParDo.of(
               new DoFn<String, String>() {
@@ -393,7 +393,7 @@ public class Regex {
       this.replacement = replacement;
     }
 
-    public PCollection<String> apply(PCollection<String> in) {
+    public PCollection<String> expand(PCollection<String> in) {
       return in.apply(
           ParDo.of(
               new DoFn<String, String>() {
@@ -434,7 +434,7 @@ public class Regex {
       this.outputEmpty = outputEmpty;
     }
 
-    public PCollection<String> apply(PCollection<String> in) {
+    public PCollection<String> expand(PCollection<String> in) {
       return in.apply(
           ParDo.of(
               new DoFn<String, String>() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
index eca987a..7d4e630 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
@@ -143,7 +143,7 @@ public class Sample {
     }
 
     @Override
-    public PCollection<T> apply(PCollection<T> in) {
+    public PCollection<T> expand(PCollection<T> in) {
       PCollectionView<Iterable<T>> iterableView = in.apply(View.<T>asIterable());
       return
           in.getPipeline()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
index d21d100..faf402c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
@@ -56,7 +56,7 @@ public class Values<V> extends PTransform<PCollection<? extends KV<?, V>>,
   private Values() { }
 
   @Override
-  public PCollection<V> apply(PCollection<? extends KV<?, V>> in) {
+  public PCollection<V> expand(PCollection<? extends KV<?, V>> in) {
     return
         in.apply("Values", MapElements.via(new SimpleFunction<KV<?, V>, V>() {
           @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 5fafc0a..126679d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -251,7 +251,7 @@ public class View {
     }
 
     @Override
-    public PCollectionView<List<T>> apply(PCollection<T> input) {
+    public PCollectionView<List<T>> expand(PCollection<T> input) {
       return input.apply(CreatePCollectionView.<T, List<T>>of(PCollectionViews.listView(
           input.getPipeline(), input.getWindowingStrategy(), input.getCoder())));
     }
@@ -277,7 +277,7 @@ public class View {
     }
 
     @Override
-    public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
+    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
       return input.apply(CreatePCollectionView.<T, Iterable<T>>of(PCollectionViews.iterableView(
           input.getPipeline(), input.getWindowingStrategy(), input.getCoder())));
     }
@@ -334,7 +334,7 @@ public class View {
     }
 
     @Override
-    public PCollectionView<T> apply(PCollection<T> input) {
+    public PCollectionView<T> expand(PCollection<T> input) {
       return input.apply(CreatePCollectionView.<T, T>of(PCollectionViews.singletonView(
           input.getPipeline(),
           input.getWindowingStrategy(),
@@ -364,7 +364,7 @@ public class View {
     }
 
     @Override
-    public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
       return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(
           PCollectionViews.multimapView(
               input.getPipeline(),
@@ -401,7 +401,7 @@ public class View {
     }
 
     @Override
-    public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) {
+    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
       return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, V>>of(
           PCollectionViews.mapView(
               input.getPipeline(),
@@ -439,7 +439,7 @@ public class View {
     }
 
     @Override
-    public PCollectionView<ViewT> apply(PCollection<ElemT> input) {
+    public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
       return view;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
index de28ecb..dd38006 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
@@ -111,7 +111,7 @@ public class WithKeys<K, V> extends PTransform<PCollection<V>,
   }
 
   @Override
-  public PCollection<KV<K, V>> apply(PCollection<V> in) {
+  public PCollection<KV<K, V>> expand(PCollection<V> in) {
     PCollection<KV<K, V>> result =
         in.apply("AddKeys", MapElements.via(new SimpleFunction<V, KV<K, V>>() {
           @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
index 64e7c45..387707b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
@@ -98,7 +98,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
   }
 
   @Override
-  public PCollection<T> apply(PCollection<T> input) {
+  public PCollection<T> expand(PCollection<T> input) {
     return input.apply(
         "AddTimestamps", ParDo.of(new AddTimestampsDoFn<T>(fn, allowedTimestampSkew)));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
index b760e2c..1d10da4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
@@ -87,7 +87,7 @@ public class CoGroupByKey<K> extends
   private CoGroupByKey() { }
 
   @Override
-  public PCollection<KV<K, CoGbkResult>> apply(
+  public PCollection<KV<K, CoGbkResult>> expand(
       KeyedPCollectionTuple<K> input) {
     if (input.isEmpty()) {
       throw new IllegalArgumentException(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 5607762..0c430d0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -445,7 +445,7 @@ public class Window {
     }
 
     @Override
-    public PCollection<T> apply(PCollection<T> input) {
+    public PCollection<T> expand(PCollection<T> input) {
       WindowingStrategy<?, ?> outputStrategy =
           getOutputStrategyInternal(input.getWindowingStrategy());
       return PCollection.createPrimitiveOutputInternal(
@@ -517,7 +517,7 @@ public class Window {
    */
   private static class Remerge<T> extends PTransform<PCollection<T>, PCollection<T>> {
     @Override
-    public PCollection<T> apply(PCollection<T> input) {
+    public PCollection<T> expand(PCollection<T> input) {
       WindowingStrategy<?, ?> outputWindowingStrategy = getOutputWindowing(
           input.getWindowingStrategy());
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index ebd44bf..4d86c74 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -50,7 +50,7 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti
   }
 
   @Override
-  public PCollection<KV<K, V>> apply(PCollection<KV<K, V>> input) {
+  public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
     WindowingStrategy<?, ?> originalStrategy = input.getWindowingStrategy();
     // If the input has already had its windows merged, then the GBK that performed the merge
     // will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index 0c743f7..fea1554 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -212,7 +212,7 @@ public class PipelineTest {
   private static class IdentityTransform<T extends PInput & POutput>
       extends PTransform<T, T> {
     @Override
-    public T apply(T input) {
+    public T expand(T input) {
       return input;
     }
   }
@@ -247,7 +247,7 @@ public class PipelineTest {
     }
 
     @Override
-    public PCollection<T> apply(PCollectionTuple input) {
+    public PCollection<T> expand(PCollectionTuple input) {
       return input.get(tag);
     }
   }
@@ -281,7 +281,7 @@ public class PipelineTest {
     }
 
     @Override
-    public PCollectionTuple apply(PCollection<T> input) {
+    public PCollectionTuple expand(PCollection<T> input) {
       return PCollectionTuple.of(tag, input);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index 530d755..d7badab 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -370,7 +370,7 @@ public class CoderRegistryTest {
 
     @Override
     public PCollection<KV<String, MySerializableGeneric<String>>>
-    apply(PCollection<String> input) {
+    expand(PCollection<String> input) {
       return input.apply(ParDo.of(new OutputDoFn()));
     }
   }
@@ -435,7 +435,7 @@ public class CoderRegistryTest {
 
     @Override
     public PCollection<KV<String, MySerializableGeneric<T>>>
-    apply(PCollection<String> input) {
+    expand(PCollection<String> input) {
       return input.apply(ParDo.of(new OutputDoFn()));
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 5be5ff1..5a7c994 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -123,7 +123,7 @@ public class WriteTest {
     }
 
     @Override
-    public PCollection<T> apply(PCollection<T> input) {
+    public PCollection<T> expand(PCollection<T> input) {
       return input
           .apply(window)
           .apply(ParDo.of(new AddArbitraryKey<T>()))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index ea43188..b0c17d8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -98,7 +98,7 @@ public class TransformHierarchyTest {
             pcList,
             new PTransform<PCollectionList<Long>, PCollection<Long>>() {
               @Override
-              public PCollection<Long> apply(PCollectionList<Long> input) {
+              public PCollection<Long> expand(PCollectionList<Long> input) {
                 return input.get(0);
               }
             });
@@ -130,7 +130,7 @@ public class TransformHierarchyTest {
         pcList,
         new PTransform<PCollectionList<Long>, PCollectionList<Long>>() {
           @Override
-          public PCollectionList<Long> apply(PCollectionList<Long> input) {
+          public PCollectionList<Long> expand(PCollectionList<Long> input) {
             return appended;
           }
         });

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index a81fb1a..d70aa2f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -73,7 +73,7 @@ public class TransformTreeTest {
       extends PTransform<PBegin, PCollectionList<String>> {
 
     @Override
-    public PCollectionList<String> apply(PBegin b) {
+    public PCollectionList<String> expand(PBegin b) {
       // Composite transform: apply delegates to other transformations,
       // here a Create transform.
       PCollection<String> result = b.apply(Create.of("hello", "world"));
@@ -95,7 +95,7 @@ public class TransformTreeTest {
       extends PTransform<PCollection<Integer>, PDone> {
 
     @Override
-    public PDone apply(PCollection<Integer> input) {
+    public PDone expand(PCollection<Integer> input) {
       // Apply an operation so that this is a composite transform.
       input.apply(Count.<Integer>perElement());
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
index 311c8de..bb2877e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
@@ -181,7 +181,7 @@ public class FlatMapElementsTest implements Serializable {
       extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Void>>> {
 
     @Override
-    public PCollection<KV<K, Void>> apply(PCollection<KV<K, V>> input) {
+    public PCollection<KV<K, Void>> expand(PCollection<KV<K, V>> input) {
       return input.apply(FlatMapElements.<KV<K, V>, KV<K, Void>>via(
           new SimpleFunction<KV<K, V>, Iterable<KV<K, Void>>>() {
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 535ffec..ebde110 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -306,7 +306,7 @@ public class GroupByKeyTest {
         p.apply(
             new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
               @Override
-              public PCollection<KV<String, Integer>> apply(PBegin input) {
+              public PCollection<KV<String, Integer>> expand(PBegin input) {
                 return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
                         input.getPipeline(),
                         WindowingStrategy.globalDefault(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index 4a34c57..ac3444b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -271,7 +271,7 @@ public class MapElementsTest implements Serializable {
       extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Void>>> {
 
     @Override
-    public PCollection<KV<K, Void>> apply(PCollection<KV<K, V>> input) {
+    public PCollection<KV<K, Void>> expand(PCollection<KV<K, V>> input) {
       return input.apply(MapElements.via(
           new SimpleFunction<KV<K, V>, KV<K, Void>>() {
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PTransformTest.java
index bfe8225..2b7caee 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PTransformTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PTransformTest.java
@@ -36,7 +36,7 @@ public class PTransformTest {
     PTransform<PCollection<String>, PCollection<String>> transform =
         new PTransform<PCollection<String>, PCollection<String>>() {
           @Override
-          public PCollection<String> apply(PCollection<String> begin) {
+          public PCollection<String> expand(PCollection<String> begin) {
             throw new IllegalArgumentException("Should never be applied");
           }
         };

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 9755076..41e795e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -272,7 +272,7 @@ public class ParDoTest implements Serializable {
     private static final TupleTag<Integer> BY3 = new TupleTag<Integer>("by3"){};
 
     @Override
-    public PCollectionTuple apply(PCollection<Integer> input) {
+    public PCollectionTuple expand(PCollection<Integer> input) {
       PCollection<Integer> by2 = input.apply("Filter2s", ParDo.of(new FilterFn(2)));
       PCollection<Integer> by3 = input.apply("Filter3s", ParDo.of(new FilterFn(3)));
       return PCollectionTuple.of(BY2, by2).and(BY3, by3);
@@ -840,7 +840,7 @@ public class ParDoTest implements Serializable {
         .apply(Create.of(inputs))
         .apply("CustomTransform", new PTransform<PCollection<Integer>, PCollection<String>>() {
             @Override
-            public PCollection<String> apply(PCollection<Integer> input) {
+            public PCollection<String> expand(PCollection<Integer> input) {
               return input.apply(ParDo.of(new TestDoFn()));
             }
           });

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index fe2d125..3bf63fd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -1360,7 +1360,7 @@ public class ViewTest implements Serializable {
         .apply(
             new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
               @Override
-              public PCollection<KV<String, Integer>> apply(PBegin input) {
+              public PCollection<KV<String, Integer>> expand(PBegin input) {
                 return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
                         input.getPipeline(),
                         WindowingStrategy.globalDefault(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
index 7630779..feff333 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
@@ -46,7 +46,7 @@ public class DisplayDataEvaluatorTest implements Serializable {
     PTransform<? super PCollection<String>, ? super POutput> myTransform =
         new PTransform<PCollection<String>, POutput> () {
           @Override
-          public PCollection<String> apply(PCollection<String> input) {
+          public PCollection<String> expand(PCollection<String> input) {
             return input.apply(ParDo.of(new DoFn<String, String>() {
               @ProcessElement
               public void processElement(ProcessContext c) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
index f7f8d40..f892153 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
@@ -78,7 +78,7 @@ public class DisplayDataMatchersTest {
 
     DisplayData data = DisplayData.from(new PTransform<PCollection<String>, PCollection<String>>() {
       @Override
-      public PCollection<String> apply(PCollection<String> input) {
+      public PCollection<String> expand(PCollection<String> input) {
         throw new IllegalArgumentException("Should never be applied");
       }
 
@@ -133,7 +133,7 @@ public class DisplayDataMatchersTest {
     assertFalse(matcher.matches(DisplayData.from(
         new PTransform<PCollection<String>, PCollection<String>>(){
           @Override
-          public PCollection<String> apply(PCollection<String> input) {
+          public PCollection<String> expand(PCollection<String> input) {
             throw new IllegalArgumentException("Should never be applied");
           }
         })));
@@ -206,7 +206,7 @@ public class DisplayDataMatchersTest {
     }
 
     @Override
-    public PCollection<String> apply(PCollection<String> input) {
+    public PCollection<String> expand(PCollection<String> input) {
       throw new IllegalArgumentException("Should never be applied");
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
index fccd031..f5c1e73 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
@@ -111,7 +111,7 @@ public class DisplayDataTest implements Serializable {
           Instant startTime = defaultStartTime;
 
           @Override
-          public PCollection<String> apply(PCollection<String> begin) {
+          public PCollection<String> expand(PCollection<String> begin) {
             throw new IllegalArgumentException("Should never be applied");
           }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index ab208dd..d4fab17 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -70,7 +70,7 @@ public class WindowingTest implements Serializable {
       this.windowFn = windowFn;
     }
     @Override
-    public PCollection<String> apply(PCollection<String> in) {
+    public PCollection<String> expand(PCollection<String> in) {
       return in.apply("Window",
               Window.<String>into(windowFn)
                   .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
index 042e9e3..1ac176b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
@@ -69,13 +69,13 @@ public class StringUtilsTest {
 
   private class EmbeddedPTransform extends PTransform<PBegin, PDone> {
     @Override
-    public PDone apply(PBegin begin) {
+    public PDone expand(PBegin begin) {
       throw new IllegalArgumentException("Should never be applied");
     }
 
     private class Bound extends PTransform<PBegin, PDone> {
       @Override
-      public PDone apply(PBegin begin) {
+      public PDone expand(PBegin begin) {
         throw new IllegalArgumentException("Should never be applied");
       }
     }
@@ -129,7 +129,7 @@ public class StringUtilsTest {
     AnonymousClass anonymousClassObj = new AnonymousClass() {
       class NamedInnerClass extends PTransform<PBegin, PDone> {
         @Override
-        public PDone apply(PBegin begin) {
+        public PDone expand(PBegin begin) {
           throw new IllegalArgumentException("Should never be applied");
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
index 4000e5d..e5f2019 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
@@ -48,7 +48,7 @@ public class PDoneTest {
    */
   static class EmptyTransform extends PTransform<PBegin, PDone> {
     @Override
-    public PDone apply(PBegin begin) {
+    public PDone expand(PBegin begin) {
       return PDone.in(begin.getPipeline());
     }
   }
@@ -64,7 +64,7 @@ public class PDoneTest {
     }
 
     @Override
-    public PDone apply(PBegin begin) {
+    public PDone expand(PBegin begin) {
       return
           begin
           .apply(Create.of(LINES))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
index 3bd9afa..d1b4d07 100644
--- a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
@@ -74,7 +74,7 @@ public class SortValues<PrimaryKeyT, SecondaryKeyT, ValueT>
   }
 
   @Override
-  public PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> apply(
+  public PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> expand(
       PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> input) {
     return input.apply(
         ParDo.of(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 8bfbd53..f99ca78 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -631,7 +631,7 @@ public class BigQueryIO {
       }
 
       @Override
-      public PCollection<TableRow> apply(PBegin input) {
+      public PCollection<TableRow> expand(PBegin input) {
         String uuid = randomUUIDString();
         final String jobIdToken = "beam_job_" + uuid;
 
@@ -840,7 +840,7 @@ public class BigQueryIO {
     }
 
     @Override
-    public PCollection<T> apply(PCollection<T> input) {
+    public PCollection<T> expand(PCollection<T> input) {
       TupleTag<T> mainOutput = new TupleTag<>();
       TupleTag<Void> cleanupSignal = new TupleTag<>();
       PCollectionTuple outputs = input.apply(ParDo.of(new IdentityFn<T>())
@@ -1846,7 +1846,7 @@ public class BigQueryIO {
       }
 
       @Override
-      public PDone apply(PCollection<TableRow> input) {
+      public PDone expand(PCollection<TableRow> input) {
         Pipeline p = input.getPipeline();
         BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
         BigQueryServices bqServices = getBigQueryServices();
@@ -2803,7 +2803,7 @@ public class BigQueryIO {
     }
 
     @Override
-    public PDone apply(PCollection<TableRow> input) {
+    public PDone expand(PCollection<TableRow> input) {
       // A naive implementation would be to simply stream data directly to BigQuery.
       // However, this could occasionally lead to duplicated data, e.g., when
       // a VM that runs this code is restarted and the code is re-run.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 1ee9253..a83784b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -286,7 +286,7 @@ public class BigtableIO {
     }
 
     @Override
-    public PCollection<Row> apply(PBegin input) {
+    public PCollection<Row> expand(PBegin input) {
       BigtableSource source =
           new BigtableSource(new SerializableFunction<PipelineOptions, BigtableService>() {
             @Override
@@ -500,7 +500,7 @@ public class BigtableIO {
     }
 
     @Override
-    public PDone apply(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
+    public PDone expand(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
       input.apply(ParDo.of(new BigtableWriterFn(tableId,
           new SerializableFunction<PipelineOptions, BigtableService>() {
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index d1a9a67..1e8271c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -411,7 +411,7 @@ public class DatastoreV1 {
     }
 
     @Override
-    public PCollection<Entity> apply(PBegin input) {
+    public PCollection<Entity> expand(PBegin input) {
       V1Options v1Options = V1Options.from(getProjectId(), getQuery(),
           getNamespace());
 
@@ -779,7 +779,7 @@ public class DatastoreV1 {
     }
 
     @Override
-    public PDone apply(PCollection<T> input) {
+    public PDone expand(PCollection<T> input) {
       input.apply("Convert to Mutation", MapElements.via(mutationFn))
           .apply("Write Mutation to Datastore", ParDo.of(new DatastoreWriterFn(projectId)));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 9644a65..c625287 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -296,7 +296,7 @@ public class JdbcIO {
     }
 
     @Override
-    public PCollection<T> apply(PBegin input) {
+    public PCollection<T> expand(PBegin input) {
       return input
           .apply(Create.of(getQuery()))
           .apply(ParDo.of(new ReadFn<>(this))).setCoder(getCoder())
@@ -417,7 +417,7 @@ public class JdbcIO {
     }
 
     @Override
-    public PDone apply(PCollection<T> input) {
+    public PDone expand(PCollection<T> input) {
       input.apply(ParDo.of(new WriteFn<T>(this)));
       return PDone.in(input.getPipeline());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index 00b91ad..24fa67d 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -135,7 +135,7 @@ public class JmsIO {
     }
 
     @Override
-    public PCollection<JmsRecord> apply(PBegin input) {
+    public PCollection<JmsRecord> expand(PBegin input) {
       // handles unbounded source to bounded conversion if maxNumRecords is set.
       Unbounded<JmsRecord> unbounded = org.apache.beam.sdk.io.Read.from(createSource());
 
@@ -434,7 +434,7 @@ public class JmsIO {
     }
 
     @Override
-    public PDone apply(PCollection<String> input) {
+    public PDone expand(PCollection<String> input) {
       input.apply(ParDo.of(new JmsWriter(connectionFactory, queue, topic)));
       return PDone.in(input.getPipeline());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index c87d12b..735b8e7 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -445,7 +445,7 @@ public class KafkaIO {
     }
 
     @Override
-    public PCollection<KafkaRecord<K, V>> apply(PBegin input) {
+    public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
      // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
       Unbounded<KafkaRecord<K, V>> unbounded =
           org.apache.beam.sdk.io.Read.from(makeSource());
@@ -544,9 +544,9 @@ public class KafkaIO {
     }
 
     @Override
-    public PCollection<KV<K, V>> apply(PBegin begin) {
+    public PCollection<KV<K, V>> expand(PBegin begin) {
       return typedRead
-          .apply(begin)
+          .expand(begin)
           .apply("Remove Kafka Metadata",
               ParDo.of(new DoFn<KafkaRecord<K, V>, KV<K, V>>() {
                 @ProcessElement
@@ -1244,7 +1244,7 @@ public class KafkaIO {
     }
 
     @Override
-    public PDone apply(PCollection<KV<K, V>> input) {
+    public PDone expand(PCollection<KV<K, V>> input) {
       input.apply(ParDo.of(new KafkaWriter<K, V>(
           topic, keyCoder, valueCoder, producerConfig, producerFactoryFnOpt)));
       return PDone.in(input.getPipeline());
@@ -1311,7 +1311,7 @@ public class KafkaIO {
     }
 
     @Override
-    public PDone apply(PCollection<V> input) {
+    public PDone expand(PCollection<V> input) {
       return input
         .apply("Kafka values with default key",
           MapElements.via(new SimpleFunction<V, KV<Void, V>>() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
index a482df4..b65e671 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
@@ -308,7 +308,7 @@ public class MongoDbGridFSIO {
     }
 
     @Override
-    public PCollection<T> apply(PBegin input) {
+    public PCollection<T> expand(PBegin input) {
       final BoundedGridFSSource source = new BoundedGridFSSource(this, null);
       org.apache.beam.sdk.io.Read.Bounded<ObjectId> objectIds =
           org.apache.beam.sdk.io.Read.from(source);
@@ -621,7 +621,7 @@ public class MongoDbGridFSIO {
     }
 
     @Override
-    public PDone apply(PCollection<T> input) {
+    public PDone expand(PCollection<T> input) {
       input.apply(ParDo.of(new GridFsWriteFn<T>(this)));
       return PDone.in(input.getPipeline());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 70239e6..f539431 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -160,7 +160,7 @@ public class MongoDbIO {
     }
 
     @Override
-    public PCollection<Document> apply(PBegin input) {
+    public PCollection<Document> expand(PBegin input) {
       return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedMongoDbSource(this)));
     }
 
@@ -441,7 +441,7 @@ public class MongoDbIO {
     }
 
     @Override
-    public PDone apply(PCollection<Document> input) {
+    public PDone expand(PCollection<Document> input) {
       input.apply(ParDo.of(new WriteFn(this)));
       return PDone.in(input.getPipeline());
     }


[3/3] incubator-beam git commit: Rename PTransform.apply to PTransform.expand

Posted by ke...@apache.org.
Rename PTransform.apply to PTransform.expand


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4d607b5a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4d607b5a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4d607b5a

Branch: refs/heads/master
Commit: 4d607b5a594bbf2be76626200d989a9e65ba3da9
Parents: 04a41ee
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 7 13:33:04 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:35:36 2016 -0800

----------------------------------------------------------------------
 .../org/apache/beam/examples/WordCount.java     |  2 +-
 .../beam/examples/complete/AutoComplete.java    |  6 +--
 .../apache/beam/examples/complete/TfIdf.java    |  6 +--
 .../examples/complete/TopWikipediaSessions.java |  6 +--
 .../examples/complete/TrafficMaxLaneFlow.java   |  4 +-
 .../beam/examples/complete/TrafficRoutes.java   |  4 +-
 .../examples/cookbook/BigQueryTornadoes.java    |  2 +-
 .../cookbook/CombinePerKeyExamples.java         |  2 +-
 .../beam/examples/cookbook/FilterExamples.java  |  2 +-
 .../examples/cookbook/MaxPerKeyExamples.java    |  2 +-
 .../beam/examples/cookbook/TriggerExample.java  |  4 +-
 .../examples/complete/AutoCompleteTest.java     |  2 +-
 .../beam/examples/complete/game/GameStats.java  |  2 +-
 .../examples/complete/game/LeaderBoard.java     |  4 +-
 .../beam/examples/complete/game/UserScore.java  |  2 +-
 .../complete/game/utils/WriteToBigQuery.java    |  2 +-
 .../game/utils/WriteWindowedToBigQuery.java     |  2 +-
 .../apache/beam/runners/apex/ApexRunner.java    | 10 ++--
 .../apache/beam/runners/core/AssignWindows.java |  2 +-
 .../core/GroupByKeyViaGroupByKeyOnly.java       |  8 +--
 .../beam/runners/core/SplittableParDo.java      |  6 +--
 .../core/UnboundedReadFromBoundedSource.java    |  2 +-
 .../beam/runners/direct/DirectGroupByKey.java   |  6 +--
 .../runners/direct/ForwardingPTransform.java    |  6 +--
 .../direct/ParDoMultiOverrideFactory.java       |  4 +-
 .../ParDoSingleViaMultiOverrideFactory.java     |  2 +-
 .../direct/TestStreamEvaluatorFactory.java      |  2 +-
 .../runners/direct/ViewEvaluatorFactory.java    |  4 +-
 .../direct/WriteWithShardingFactory.java        |  4 +-
 .../runners/direct/CommittedResultTest.java     |  2 +-
 .../runners/direct/DirectGraphVisitorTest.java  |  2 +-
 .../direct/ForwardingPTransformTest.java        |  4 +-
 .../direct/KeyedPValueTrackingVisitorTest.java  |  4 +-
 .../beam/runners/flink/examples/TFIDF.java      |  6 +--
 .../beam/runners/flink/examples/WordCount.java  |  2 +-
 .../flink/examples/streaming/AutoComplete.java  |  6 +--
 .../apache/beam/runners/flink/FlinkRunner.java  | 14 +++---
 .../beam/runners/dataflow/DataflowRunner.java   | 51 ++++++++++----------
 .../dataflow/internal/AssignWindows.java        |  2 +-
 .../DataflowUnboundedReadFromBoundedSource.java |  2 +-
 .../DataflowPipelineTranslatorTest.java         |  8 +--
 .../runners/dataflow/DataflowRunnerTest.java    |  2 +-
 .../transforms/DataflowGroupByKeyTest.java      |  2 +-
 .../dataflow/transforms/DataflowViewTest.java   |  2 +-
 .../beam/runners/spark/examples/WordCount.java  |  2 +-
 .../apache/beam/runners/spark/io/ConsoleIO.java |  2 +-
 .../beam/runners/spark/io/CreateStream.java     |  2 +-
 .../beam/runners/spark/io/hadoop/HadoopIO.java  |  4 +-
 .../translation/StorageLevelPTransform.java     |  2 +-
 .../util/SinglePrimitiveOutputPTransform.java   |  2 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     |  4 +-
 .../sdk/io/BoundedReadFromUnboundedSource.java  |  2 +-
 .../org/apache/beam/sdk/io/CountingInput.java   |  4 +-
 .../java/org/apache/beam/sdk/io/PubsubIO.java   |  4 +-
 .../apache/beam/sdk/io/PubsubUnboundedSink.java |  2 +-
 .../beam/sdk/io/PubsubUnboundedSource.java      |  2 +-
 .../main/java/org/apache/beam/sdk/io/Read.java  |  4 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     |  4 +-
 .../main/java/org/apache/beam/sdk/io/Write.java |  2 +-
 .../apache/beam/sdk/runners/PipelineRunner.java |  2 +-
 .../apache/beam/sdk/testing/GatherAllPanes.java |  2 +-
 .../org/apache/beam/sdk/testing/PAssert.java    | 12 ++---
 .../org/apache/beam/sdk/testing/TestStream.java |  2 +-
 .../beam/sdk/transforms/ApproximateUnique.java  |  4 +-
 .../org/apache/beam/sdk/transforms/Combine.java | 10 ++--
 .../org/apache/beam/sdk/transforms/Count.java   |  2 +-
 .../org/apache/beam/sdk/transforms/Create.java  |  4 +-
 .../apache/beam/sdk/transforms/Distinct.java    |  4 +-
 .../org/apache/beam/sdk/transforms/Filter.java  |  2 +-
 .../beam/sdk/transforms/FlatMapElements.java    |  2 +-
 .../org/apache/beam/sdk/transforms/Flatten.java |  4 +-
 .../apache/beam/sdk/transforms/GroupByKey.java  |  2 +-
 .../org/apache/beam/sdk/transforms/Keys.java    |  2 +-
 .../org/apache/beam/sdk/transforms/KvSwap.java  |  2 +-
 .../org/apache/beam/sdk/transforms/Latest.java  |  4 +-
 .../apache/beam/sdk/transforms/MapElements.java |  2 +-
 .../apache/beam/sdk/transforms/PTransform.java  |  4 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |  4 +-
 .../apache/beam/sdk/transforms/Partition.java   |  2 +-
 .../org/apache/beam/sdk/transforms/Regex.java   | 14 +++---
 .../org/apache/beam/sdk/transforms/Sample.java  |  2 +-
 .../org/apache/beam/sdk/transforms/Values.java  |  2 +-
 .../org/apache/beam/sdk/transforms/View.java    | 12 ++---
 .../apache/beam/sdk/transforms/WithKeys.java    |  2 +-
 .../beam/sdk/transforms/WithTimestamps.java     |  2 +-
 .../beam/sdk/transforms/join/CoGroupByKey.java  |  2 +-
 .../beam/sdk/transforms/windowing/Window.java   |  4 +-
 .../org/apache/beam/sdk/util/Reshuffle.java     |  2 +-
 .../java/org/apache/beam/sdk/PipelineTest.java  |  6 +--
 .../beam/sdk/coders/CoderRegistryTest.java      |  4 +-
 .../java/org/apache/beam/sdk/io/WriteTest.java  |  2 +-
 .../sdk/runners/TransformHierarchyTest.java     |  4 +-
 .../beam/sdk/runners/TransformTreeTest.java     |  4 +-
 .../sdk/transforms/FlatMapElementsTest.java     |  2 +-
 .../beam/sdk/transforms/GroupByKeyTest.java     |  2 +-
 .../beam/sdk/transforms/MapElementsTest.java    |  2 +-
 .../beam/sdk/transforms/PTransformTest.java     |  2 +-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  4 +-
 .../apache/beam/sdk/transforms/ViewTest.java    |  2 +-
 .../display/DisplayDataEvaluatorTest.java       |  2 +-
 .../display/DisplayDataMatchersTest.java        |  6 +--
 .../sdk/transforms/display/DisplayDataTest.java |  2 +-
 .../sdk/transforms/windowing/WindowingTest.java |  2 +-
 .../apache/beam/sdk/util/StringUtilsTest.java   |  6 +--
 .../org/apache/beam/sdk/values/PDoneTest.java   |  4 +-
 .../beam/sdk/extensions/sorter/SortValues.java  |  2 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  8 +--
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    |  4 +-
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  |  4 +-
 .../org/apache/beam/sdk/io/jdbc/JdbcIO.java     |  4 +-
 .../java/org/apache/beam/sdk/io/jms/JmsIO.java  |  4 +-
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 10 ++--
 .../beam/sdk/io/mongodb/MongoDbGridFSIO.java    |  4 +-
 .../apache/beam/sdk/io/mongodb/MongoDbIO.java   |  4 +-
 114 files changed, 238 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index 5be0ddc..d4da542 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -126,7 +126,7 @@ public class WordCount {
   public static class CountWords extends PTransform<PCollection<String>,
       PCollection<KV<String, Long>>> {
     @Override
-    public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
+    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
 
       // Convert lines of text into individual words.
       PCollection<String> words = lines.apply(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index c556e3f..31b06c9 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -113,7 +113,7 @@ public class AutoComplete {
     }
 
     @Override
-    public PCollection<KV<String, List<CompletionCandidate>>> apply(PCollection<String> input) {
+    public PCollection<KV<String, List<CompletionCandidate>>> expand(PCollection<String> input) {
       PCollection<CompletionCandidate> candidates = input
         // First count how often each token appears.
         .apply(new Count.PerElement<String>())
@@ -154,7 +154,7 @@ public class AutoComplete {
     }
 
     @Override
-    public PCollection<KV<String, List<CompletionCandidate>>> apply(
+    public PCollection<KV<String, List<CompletionCandidate>>> expand(
         PCollection<CompletionCandidate> input) {
       return input
         // For each completion candidate, map it to all prefixes.
@@ -209,7 +209,7 @@ public class AutoComplete {
     }
 
     @Override
-    public PCollectionList<KV<String, List<CompletionCandidate>>> apply(
+    public PCollectionList<KV<String, List<CompletionCandidate>>> expand(
           PCollection<CompletionCandidate> input) {
         if (minPrefix > 10) {
           // Base case, partitioning to return the output in the expected format.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index edf48e7..ea015ae 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -159,7 +159,7 @@ public class TfIdf {
     }
 
     @Override
-    public PCollection<KV<URI, String>> apply(PBegin input) {
+    public PCollection<KV<URI, String>> expand(PBegin input) {
       Pipeline pipeline = input.getPipeline();
 
       // Create one TextIO.Read transform for each document
@@ -200,7 +200,7 @@ public class TfIdf {
     public ComputeTfIdf() { }
 
     @Override
-    public PCollection<KV<String, KV<URI, Double>>> apply(
+    public PCollection<KV<String, KV<URI, Double>>> expand(
       PCollection<KV<URI, String>> uriToContent) {
 
       // Compute the total number of documents, and
@@ -390,7 +390,7 @@ public class TfIdf {
     }
 
     @Override
-    public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
+    public PDone expand(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
       return wordToUriAndTfIdf
           .apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() {
             @ProcessElement

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
index d57cc3a..df7f81e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -99,7 +99,7 @@ public class TopWikipediaSessions {
   static class ComputeSessions
       extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
     @Override
-    public PCollection<KV<String, Long>> apply(PCollection<String> actions) {
+    public PCollection<KV<String, Long>> expand(PCollection<String> actions) {
       return actions
           .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardHours(1))))
 
@@ -113,7 +113,7 @@ public class TopWikipediaSessions {
   private static class TopPerMonth
       extends PTransform<PCollection<KV<String, Long>>, PCollection<List<KV<String, Long>>>> {
     @Override
-    public PCollection<List<KV<String, Long>>> apply(PCollection<KV<String, Long>> sessions) {
+    public PCollection<List<KV<String, Long>>> expand(PCollection<KV<String, Long>> sessions) {
       return sessions
         .apply(Window.<KV<String, Long>>into(CalendarWindows.months(1)))
 
@@ -154,7 +154,7 @@ public class TopWikipediaSessions {
     }
 
     @Override
-    public PCollection<String> apply(PCollection<TableRow> input) {
+    public PCollection<String> expand(PCollection<TableRow> input) {
       return input
           .apply(ParDo.of(new ExtractUserAndTimestamp()))
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
index 0c367d4..c1032b9 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
@@ -267,7 +267,7 @@ public class TrafficMaxLaneFlow {
   static class MaxLaneFlow
       extends PTransform<PCollection<KV<String, LaneInfo>>, PCollection<TableRow>> {
     @Override
-    public PCollection<TableRow> apply(PCollection<KV<String, LaneInfo>> flowInfo) {
+    public PCollection<TableRow> expand(PCollection<KV<String, LaneInfo>> flowInfo) {
       // stationId, LaneInfo => stationId + max lane flow info
       PCollection<KV<String, LaneInfo>> flowMaxes =
           flowInfo.apply(Combine.<String, LaneInfo>perKey(
@@ -289,7 +289,7 @@ public class TrafficMaxLaneFlow {
     }
 
     @Override
-    public PCollection<String> apply(PBegin begin) {
+    public PCollection<String> expand(PBegin begin) {
       return begin
           .apply(TextIO.Read.from(inputFile))
           .apply(ParDo.of(new ExtractTimestamps()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
index 14cee4d..9b5d577 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
@@ -274,7 +274,7 @@ public class TrafficRoutes {
   static class TrackSpeed extends
       PTransform<PCollection<KV<String, StationSpeed>>, PCollection<TableRow>> {
     @Override
-    public PCollection<TableRow> apply(PCollection<KV<String, StationSpeed>> stationSpeed) {
+    public PCollection<TableRow> expand(PCollection<KV<String, StationSpeed>> stationSpeed) {
       // Apply a GroupByKey transform to collect a list of all station
       // readings for a given route.
       PCollection<KV<String, Iterable<StationSpeed>>> timeGroup = stationSpeed.apply(
@@ -299,7 +299,7 @@ public class TrafficRoutes {
     }
 
     @Override
-    public PCollection<String> apply(PBegin begin) {
+    public PCollection<String> expand(PBegin begin) {
       return begin
           .apply(TextIO.Read.from(inputFile))
           .apply(ParDo.of(new ExtractTimestamps()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
index a4c1a6b..14d0f58 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
@@ -108,7 +108,7 @@ public class BigQueryTornadoes {
   static class CountTornadoes
       extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
     @Override
-    public PCollection<TableRow> apply(PCollection<TableRow> rows) {
+    public PCollection<TableRow> expand(PCollection<TableRow> rows) {
 
       // row... => month...
       PCollection<Integer> tornadoes = rows.apply(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index 93eee15..29655ea 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -125,7 +125,7 @@ public class CombinePerKeyExamples {
   static class PlaysForWord
       extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
     @Override
-    public PCollection<TableRow> apply(PCollection<TableRow> rows) {
+    public PCollection<TableRow> expand(PCollection<TableRow> rows) {
 
       // row... => <word, play_name> ...
       PCollection<KV<String, String>> words = rows.apply(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
index 6e6452c..fb6b507 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
@@ -153,7 +153,7 @@ public class FilterExamples {
 
 
     @Override
-    public PCollection<TableRow> apply(PCollection<TableRow> rows) {
+    public PCollection<TableRow> expand(PCollection<TableRow> rows) {
 
       // Extract the mean_temp from each row.
       PCollection<Double> meanTemps = rows.apply(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
index abc10f3..eabc42b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
@@ -100,7 +100,7 @@ public class MaxPerKeyExamples {
   static class MaxMeanTemp
       extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
     @Override
-    public PCollection<TableRow> apply(PCollection<TableRow> rows) {
+    public PCollection<TableRow> expand(PCollection<TableRow> rows) {
 
       // row... => <month, mean_temp> ...
       PCollection<KV<Integer, Double>> temps = rows.apply(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index d965d4a..bf3afca 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -171,7 +171,7 @@ public class TriggerExample {
     }
 
     @Override
-    public PCollectionList<TableRow> apply(PCollection<KV<String, Integer>> flowInfo) {
+    public PCollectionList<TableRow> expand(PCollection<KV<String, Integer>> flowInfo) {
 
       // Concept #1: The default triggering behavior
       // By default Beam uses a trigger which fires when the watermark has passed the end of the
@@ -332,7 +332,7 @@ public class TriggerExample {
     }
 
     @Override
-    public PCollection<TableRow> apply(PCollection<KV<String, Integer>> flowInfo) {
+    public PCollection<TableRow> expand(PCollection<KV<String, Integer>> flowInfo) {
       PCollection<KV<String, Iterable<Integer>>> flowPerFreeway = flowInfo
           .apply(GroupByKey.<String, Integer>create());
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
index 5dbfa70..d7d4dc6 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
@@ -168,7 +168,7 @@ public class AutoCompleteTest implements Serializable {
   private static class ReifyTimestamps<T>
       extends PTransform<PCollection<TimestampedValue<T>>, PCollection<T>> {
     @Override
-    public PCollection<T> apply(PCollection<TimestampedValue<T>> input) {
+    public PCollection<T> expand(PCollection<TimestampedValue<T>> input) {
       return input.apply(ParDo.of(new DoFn<TimestampedValue<T>, T>() {
         @ProcessElement
         public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index f352252..6ad6a23 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -110,7 +110,7 @@ public class GameStats extends LeaderBoard {
     private static final double SCORE_WEIGHT = 2.5;
 
     @Override
-    public PCollection<KV<String, Integer>> apply(PCollection<KV<String, Integer>> userScores) {
+    public PCollection<KV<String, Integer>> expand(PCollection<KV<String, Integer>> userScores) {
 
       // Get the sum of scores for each user.
       PCollection<KV<String, Integer>> sumScores = userScores

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 3275fa0..519bd5f 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -234,7 +234,7 @@ public class LeaderBoard extends HourlyTeamScore {
     }
 
     @Override
-    public PCollection<KV<String, Integer>> apply(PCollection<GameActionInfo> infos) {
+    public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> infos) {
       return infos.apply("LeaderboardTeamFixedWindows",
           Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))
               // We will get early (speculative) results as well as cumulative
@@ -267,7 +267,7 @@ public class LeaderBoard extends HourlyTeamScore {
     }
 
     @Override
-    public PCollection<KV<String, Integer>> apply(PCollection<GameActionInfo> input) {
+    public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> input) {
       return input.apply("LeaderboardUserGlobalWindow",
           Window.<GameActionInfo>into(new GlobalWindows())
               // Get periodic results every ten minutes.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index 2bca7fc..cb81a7e 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -160,7 +160,7 @@ public class UserScore {
     }
 
     @Override
-    public PCollection<KV<String, Integer>> apply(
+    public PCollection<KV<String, Integer>> expand(
         PCollection<GameActionInfo> gameInfo) {
 
       return gameInfo

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
index 89fc271..1f33915 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
@@ -118,7 +118,7 @@ public class WriteToBigQuery<InputT>
   }
 
   @Override
-  public PDone apply(PCollection<InputT> teamAndScore) {
+  public PDone expand(PCollection<InputT> teamAndScore) {
     return teamAndScore
       .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
       .apply(BigQueryIO.Write

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
index 4f2e719..c32289f 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
@@ -60,7 +60,7 @@ public class WriteWindowedToBigQuery<T>
   }
 
   @Override
-  public PDone apply(PCollection<T> teamAndScore) {
+  public PDone expand(PCollection<T> teamAndScore) {
     return teamAndScore
       .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
       .apply(BigQueryIO.Write

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 5ce4fef..9507fb9 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -165,7 +165,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     }
 
     @Override
-    public PCollection<T> apply(PCollection<T> input) {
+    public PCollection<T> expand(PCollection<T> input) {
       WindowingStrategy<?, ?> outputStrategy =
           wrapped.getOutputStrategyInternal(input.getWindowingStrategy());
 
@@ -226,7 +226,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     }
 
     @Override
-    public PCollectionView<ViewT> apply(PCollection<List<ElemT>> input) {
+    public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
       return view;
     }
   }
@@ -252,7 +252,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     }
 
     @Override
-    public PCollectionView<OutputT> apply(PCollection<InputT> input) {
+    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
       PCollection<OutputT> combined = input
           .apply(Combine.globally(transform.getCombineFn())
               .withoutDefaults().withFanout(transform.getFanout()));
@@ -282,7 +282,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     }
 
     @Override
-    public PCollectionView<T> apply(PCollection<T> input) {
+    public PCollectionView<T> expand(PCollection<T> input) {
       Combine.Globally<T, T> combine = Combine
           .globally(new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
       if (!transform.hasDefaultValue()) {
@@ -335,7 +335,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     }
 
     @Override
-    public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
+    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
       PCollectionView<Iterable<T>> view = PCollectionViews.iterableView(input.getPipeline(),
           input.getWindowingStrategy(), input.getCoder());
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
index f2387f5..375932a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
@@ -40,7 +40,7 @@ public class AssignWindows<T, W extends BoundedWindow>
   }
 
   @Override
-  public PCollection<T> apply(PCollection<T> input) {
+  public PCollection<T> expand(PCollection<T> input) {
     return input.apply("AssignWindows", ParDo.of(new AssignWindowsDoFn<>(fn)));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
index 43047ca..694c5eb 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
@@ -77,7 +77,7 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
   }
 
   @Override
-  public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+  public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
     WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
 
     return input
@@ -109,7 +109,7 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     @Override
-    public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(PCollection<KV<K, V>> input) {
+    public PCollection<KV<K, Iterable<WindowedValue<V>>>> expand(PCollection<KV<K, V>> input) {
       return PCollection.createPrimitiveOutputInternal(
           input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
     }
@@ -128,7 +128,7 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
           PCollection<KV<K, Iterable<WindowedValue<V>>>>,
           PCollection<KV<K, Iterable<WindowedValue<V>>>>> {
     @Override
-    public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(
+    public PCollection<KV<K, Iterable<WindowedValue<V>>>> expand(
         PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
       return input
           .apply(
@@ -225,7 +225,7 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
     }
 
     @Override
-    public PCollection<KV<K, Iterable<V>>> apply(
+    public PCollection<KV<K, Iterable<V>>> expand(
         PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
       @SuppressWarnings("unchecked")
       KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 580e842..0bf882b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -110,7 +110,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
   }
 
   @Override
-  public PCollectionTuple apply(PCollection<InputT> input) {
+  public PCollectionTuple expand(PCollection<InputT> input) {
     return applyTyped(input);
   }
 
@@ -179,7 +179,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
   public static class GBKIntoKeyedWorkItems<KeyT, InputT>
       extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
     @Override
-    public PCollection<KeyedWorkItem<KeyT, InputT>> apply(PCollection<KV<KeyT, InputT>> input) {
+    public PCollection<KeyedWorkItem<KeyT, InputT>> expand(PCollection<KV<KeyT, InputT>> input) {
       return PCollection.createPrimitiveOutputInternal(
           input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded());
     }
@@ -247,7 +247,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     }
 
     @Override
-    public PCollectionTuple apply(
+    public PCollectionTuple expand(
         PCollection<? extends KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
             input) {
       DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
index 29dc57e..f3f93e1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
@@ -88,7 +88,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
   }
 
   @Override
-  public PCollection<T> apply(PBegin input) {
+  public PCollection<T> expand(PBegin input) {
     return input.getPipeline().apply(
         Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 21776e7..405d913 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -45,7 +45,7 @@ class DirectGroupByKey<K, V>
   }
 
   @Override
-  public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+  public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
     @SuppressWarnings("unchecked")
     KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
 
@@ -79,7 +79,7 @@ class DirectGroupByKey<K, V>
   static final class DirectGroupByKeyOnly<K, V>
       extends PTransform<PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>> {
     @Override
-    public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, V>> input) {
+    public PCollection<KeyedWorkItem<K, V>> expand(PCollection<KV<K, V>> input) {
       return PCollection.createPrimitiveOutputInternal(
           input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded());
     }
@@ -126,7 +126,7 @@ class DirectGroupByKey<K, V>
     }
 
     @Override
-    public PCollection<KV<K, Iterable<V>>> apply(PCollection<KeyedWorkItem<K, V>> input) {
+    public PCollection<KV<K, Iterable<V>>> expand(PCollection<KeyedWorkItem<K, V>> input) {
       return PCollection.createPrimitiveOutputInternal(
           input.getPipeline(), outputWindowingStrategy, input.isBounded());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
index 77311c2..97c0983 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
@@ -28,15 +28,15 @@ import org.apache.beam.sdk.values.TypedPValue;
 /**
  * A base class for implementing {@link PTransform} overrides, which behave identically to the
  * delegate transform but with overridden methods. Implementors are required to implement
- * {@link #delegate()}, which returns the object to forward calls to, and {@link #apply(PInput)}.
+ * {@link #delegate()}, which returns the object to forward calls to, and {@link #expand(PInput)}.
  */
 public abstract class ForwardingPTransform<InputT extends PInput, OutputT extends POutput>
     extends PTransform<InputT, OutputT> {
   protected abstract PTransform<InputT, OutputT> delegate();
 
   @Override
-  public OutputT apply(InputT input) {
-    return delegate().apply(input);
+  public OutputT expand(InputT input) {
+    return delegate().expand(input);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 9c9256d..8c96e9b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -73,7 +73,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
     }
 
     @Override
-    public PCollectionTuple apply(PCollection<KV<K, InputT>> input) {
+    public PCollectionTuple expand(PCollection<KV<K, InputT>> input) {
 
       PCollectionTuple outputs = input
           .apply("Group by key", GroupByKey.<K, InputT>create())
@@ -106,7 +106,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
       return underlyingParDo.getDefaultOutputCoder(originalInput, output);
     }
 
-    public PCollectionTuple apply(PCollection<? extends KV<K, Iterable<InputT>>> input) {
+    public PCollectionTuple expand(PCollection<? extends KV<K, Iterable<InputT>>> input) {
 
       PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal(
           input.getPipeline(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
index 7f2de66..10530bb 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
@@ -49,7 +49,7 @@ class ParDoSingleViaMultiOverrideFactory<InputT, OutputT>
     }
 
     @Override
-    public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
+    public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
 
       // Output tags for ParDo need only be unique up to applied transform
       TupleTag<OutputT> mainOutputTag = new TupleTag<OutputT>(MAIN_OUTPUT_TAG);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 9df7cdc..3601dbc 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -170,7 +170,7 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
       }
 
       @Override
-      public PCollection<T> apply(PBegin input) {
+      public PCollection<T> expand(PBegin input) {
         PipelineRunner<?> runner = input.getPipeline().getRunner();
         checkState(
             runner instanceof DirectRunner,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index b92ade1..460b1c2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -115,7 +115,7 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
     }
 
     @Override
-    public PCollectionView<ViewT> apply(PCollection<ElemT> input) {
+    public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
       return input.apply(WithKeys.<Void, ElemT>of((Void) null))
           .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()))
           .apply(GroupByKey.<Void, ElemT>create())
@@ -145,7 +145,7 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
     }
 
     @Override
-    public PCollectionView<ViewT> apply(PCollection<Iterable<ElemT>> input) {
+    public PCollectionView<ViewT> expand(PCollection<Iterable<ElemT>> input) {
       return og.getView();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index cf535cf..3c88337 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -66,7 +66,7 @@ class WriteWithShardingFactory<InputT>
     }
 
     @Override
-    public PDone apply(PCollection<T> input) {
+    public PDone expand(PCollection<T> input) {
       checkArgument(IsBounded.BOUNDED == input.isBounded(),
           "%s can only be applied to a Bounded PCollection",
           getClass().getSimpleName());
@@ -92,7 +92,7 @@ class WriteWithShardingFactory<InputT>
       // without adding a new Write Transform Node, which would be overwritten the same way, leading
       // to an infinite recursion. We cannot modify the number of shards, because that is determined
       // at runtime.
-      return original.apply(resharded);
+      return original.expand(resharded);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index 00dca20..c6986c0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -52,7 +52,7 @@ public class CommittedResultTest implements Serializable {
   private transient AppliedPTransform<?, ?, ?> transform =
       AppliedPTransform.of("foo", p.begin(), PDone.in(p), new PTransform<PBegin, PDone>() {
         @Override
-        public PDone apply(PBegin begin) {
+        public PDone expand(PBegin begin) {
           throw new IllegalArgumentException("Should never be applied");
         }
       });

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
index 5ad278b..b88c9a4 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -213,7 +213,7 @@ public class DirectGraphVisitorTest implements Serializable {
         transformed.apply(
             new PTransform<PInput, PDone>() {
               @Override
-              public PDone apply(PInput input) {
+              public PDone expand(PInput input) {
                 return PDone.in(input.getPipeline());
               }
             });

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
index c75adaa..6860a58 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
@@ -66,8 +66,8 @@ public class ForwardingPTransformTest {
     PCollection<Integer> collection = mock(PCollection.class);
     @SuppressWarnings("unchecked")
     PCollection<String> output = mock(PCollection.class);
-    when(delegate.apply(collection)).thenReturn(output);
-    PCollection<String> result = forwarding.apply(collection);
+    when(delegate.expand(collection)).thenReturn(output);
+    PCollection<String> result = forwarding.expand(collection);
     assertThat(result, equalTo(output));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
index cf65936..0852cd3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
@@ -163,7 +163,7 @@ public class KeyedPValueTrackingVisitorTest {
 
   private static class PrimitiveKeyer<K> extends PTransform<PCollection<K>, PCollection<K>> {
     @Override
-    public PCollection<K> apply(PCollection<K> input) {
+    public PCollection<K> expand(PCollection<K> input) {
       return PCollection.<K>createPrimitiveOutputInternal(
               input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
           .setCoder(input.getCoder());
@@ -172,7 +172,7 @@ public class KeyedPValueTrackingVisitorTest {
 
   private static class CompositeKeyer<K> extends PTransform<PCollection<K>, PCollection<K>> {
     @Override
-    public PCollection<K> apply(PCollection<K> input) {
+    public PCollection<K> expand(PCollection<K> input) {
       return input.apply(new PrimitiveKeyer<K>()).apply(ParDo.of(new IdentityFn<K>()));
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index b946d98..89e261b 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -176,7 +176,7 @@ public class TFIDF {
     }
 
     @Override
-    public PCollection<KV<URI, String>> apply(PBegin input) {
+    public PCollection<KV<URI, String>> expand(PBegin input) {
       Pipeline pipeline = input.getPipeline();
 
       // Create one TextIO.Read transform for each document
@@ -219,7 +219,7 @@ public class TFIDF {
     public ComputeTfIdf() { }
 
     @Override
-    public PCollection<KV<String, KV<URI, Double>>> apply(
+    public PCollection<KV<String, KV<URI, Double>>> expand(
         PCollection<KV<URI, String>> uriToContent) {
 
       // Compute the total number of documents, and
@@ -419,7 +419,7 @@ public class TFIDF {
     }
 
     @Override
-    public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
+    public PDone expand(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
       return wordToUriAndTfIdf
           .apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() {
             private static final long serialVersionUID = 0;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
index c816442..b6b3c1a 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
@@ -72,7 +72,7 @@ public class WordCount {
   public static class CountWords extends PTransform<PCollection<String>,
                     PCollection<KV<String, Long>>> {
     @Override
-    public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
+    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
 
       // Convert lines of text into individual words.
       PCollection<String> words = lines.apply(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index 1e0c3ac..3405981 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -82,7 +82,7 @@ public class AutoComplete {
     }
 
     @Override
-    public PCollection<KV<String, List<CompletionCandidate>>> apply(PCollection<String> input) {
+    public PCollection<KV<String, List<CompletionCandidate>>> expand(PCollection<String> input) {
       PCollection<CompletionCandidate> candidates = input
         // First count how often each token appears.
         .apply(new Count.PerElement<String>())
@@ -129,7 +129,7 @@ public class AutoComplete {
     }
 
     @Override
-    public PCollection<KV<String, List<CompletionCandidate>>> apply(
+    public PCollection<KV<String, List<CompletionCandidate>>> expand(
         PCollection<CompletionCandidate> input) {
       return input
         // For each completion candidate, map it to all prefixes.
@@ -192,7 +192,7 @@ public class AutoComplete {
     }
 
     @Override
-    public PCollectionList<KV<String, List<CompletionCandidate>>> apply(
+    public PCollectionList<KV<String, List<CompletionCandidate>>> expand(
           PCollection<CompletionCandidate> input) {
         if (minPrefix > 10) {
           // Base case, partitioning to return the output in the expected format.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 0b92734..7c1284b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -307,7 +307,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
     }
 
     @Override
-    public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) {
+    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
       PCollectionView<Map<K, V>> view =
           PCollectionViews.mapView(
               input.getPipeline(),
@@ -352,7 +352,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
     }
 
     @Override
-    public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
       PCollectionView<Map<K, Iterable<V>>> view =
           PCollectionViews.multimapView(
               input.getPipeline(),
@@ -392,7 +392,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
     public StreamingViewAsList(FlinkRunner runner, View.AsList<T> transform) {}
 
     @Override
-    public PCollectionView<List<T>> apply(PCollection<T> input) {
+    public PCollectionView<List<T>> expand(PCollection<T> input) {
       PCollectionView<List<T>> view =
           PCollectionViews.listView(
               input.getPipeline(),
@@ -423,7 +423,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
     public StreamingViewAsIterable(FlinkRunner runner, View.AsIterable<T> transform) { }
 
     @Override
-    public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
+    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
       PCollectionView<Iterable<T>> view =
           PCollectionViews.iterableView(
               input.getPipeline(),
@@ -465,7 +465,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
     }
 
     @Override
-    public PCollectionView<T> apply(PCollection<T> input) {
+    public PCollectionView<T> expand(PCollection<T> input) {
       Combine.Globally<T, T> combine = Combine.globally(
           new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
       if (!transform.hasDefaultValue()) {
@@ -523,7 +523,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
     }
 
     @Override
-    public PCollectionView<OutputT> apply(PCollection<InputT> input) {
+    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
       PCollection<OutputT> combined =
           input.apply(Combine.globally(transform.getCombineFn())
               .withoutDefaults()
@@ -620,7 +620,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
     }
 
     @Override
-    public PCollectionView<ViewT> apply(PCollection<List<ElemT>> input) {
+    public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
       return view;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 40d8948..00c94d0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -756,7 +756,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollection<KV<K1, Iterable<KV<K2, V>>>> apply(PCollection<KV<K1, KV<K2, V>>> input) {
+    public PCollection<KV<K1, Iterable<KV<K2, V>>>> expand(PCollection<KV<K1, KV<K2, V>>> input) {
       PCollection<KV<K1, Iterable<KV<K2, V>>>> rval =
           PCollection.<KV<K1, Iterable<KV<K2, V>>>>createPrimitiveOutputInternal(
           input.getPipeline(),
@@ -814,7 +814,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollection<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>> apply(PCollection<T> input) {
+    public PCollection<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>> expand(
+        PCollection<T> input) {
       @SuppressWarnings("unchecked")
       Coder<W> windowCoder = (Coder<W>)
           input.getWindowingStrategy().getWindowFn().windowCoder();
@@ -902,7 +903,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollectionView<T> apply(PCollection<T> input) {
+    public PCollectionView<T> expand(PCollection<T> input) {
       @SuppressWarnings("unchecked")
       Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>)
           input.getWindowingStrategy().getWindowFn().windowCoder();
@@ -993,7 +994,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
+    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
       PCollectionView<Iterable<T>> view = PCollectionViews.iterableView(
           input.getPipeline(), input.getWindowingStrategy(), input.getCoder());
       return BatchViewAsList.applyForIterableLike(runner, input, view);
@@ -1097,7 +1098,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollectionView<List<T>> apply(PCollection<T> input) {
+    public PCollectionView<List<T>> expand(PCollection<T> input) {
       PCollectionView<List<T>> view = PCollectionViews.listView(
           input.getPipeline(), input.getWindowingStrategy(), input.getCoder());
       return applyForIterableLike(runner, input, view);
@@ -1265,7 +1266,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) {
+    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
       return this.<BoundedWindow>applyInternal(input);
     }
 
@@ -1406,7 +1407,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
       @Override
       public PCollection<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>>
-          apply(PCollection<KV<K, V>> input) {
+      expand(PCollection<KV<K, V>> input) {
 
         @SuppressWarnings("unchecked")
         Coder<W> windowCoder = (Coder<W>)
@@ -1754,7 +1755,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
       return this.<BoundedWindow>applyInternal(input);
     }
 
@@ -2056,13 +2057,13 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PDone apply(PCollection<T> input) {
+    public PDone expand(PCollection<T> input) {
       if (transform.getSink() instanceof FileBasedSink) {
         FileBasedSink<?> sink = (FileBasedSink<?>) transform.getSink();
         PathValidator validator = runner.options.getPathValidator();
         validator.validateOutputFilePrefixSupported(sink.getBaseOutputFilename());
       }
-      return transform.apply(input);
+      return transform.expand(input);
     }
   }
 
@@ -2071,7 +2072,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   // ================================================================================
 
   /**
-   * Suppress application of {@link PubsubUnboundedSource#apply} in streaming mode so that we
+   * Suppress application of {@link PubsubUnboundedSource#expand} in streaming mode so that we
    * can instead defer to Windmill's implementation.
    */
   private static class StreamingPubsubIORead<T> extends PTransform<PBegin, PCollection<T>> {
@@ -2090,7 +2091,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollection<T> apply(PBegin input) {
+    public PCollection<T> expand(PBegin input) {
       return PCollection.<T>createPrimitiveOutputInternal(
           input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
           .setCoder(transform.getElementCoder());
@@ -2155,7 +2156,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   }
 
   /**
-   * Suppress application of {@link PubsubUnboundedSink#apply} in streaming mode so that we
+   * Suppress application of {@link PubsubUnboundedSink#expand} in streaming mode so that we
    * can instead defer to Windmill's implementation.
    */
   private static class StreamingPubsubIOWrite<T> extends PTransform<PCollection<T>, PDone> {
@@ -2174,7 +2175,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PDone apply(PCollection<T> input) {
+    public PDone expand(PCollection<T> input) {
       return PDone.in(input.getPipeline());
     }
 
@@ -2252,7 +2253,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public final PCollection<T> apply(PInput input) {
+    public final PCollection<T> expand(PInput input) {
       source.validate();
 
       if (source.requiresDeduping()) {
@@ -2277,7 +2278,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       }
 
       @Override
-      public final PCollection<ValueWithRecordId<T>> apply(PInput input) {
+      public final PCollection<ValueWithRecordId<T>> expand(PInput input) {
         return PCollection.<ValueWithRecordId<T>>createPrimitiveOutputInternal(
             input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED);
       }
@@ -2327,7 +2328,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     // more per-key overhead.
     private static final int NUM_RESHARD_KEYS = 10000;
     @Override
-    public PCollection<T> apply(PCollection<ValueWithRecordId<T>> input) {
+    public PCollection<T> expand(PCollection<ValueWithRecordId<T>> input) {
       return input
           .apply(WithKeys.of(new SerializableFunction<ValueWithRecordId<T>, Integer>() {
                     @Override
@@ -2367,7 +2368,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public final PCollection<T> apply(PBegin input) {
+    public final PCollection<T> expand(PBegin input) {
       source.validate();
 
       return Pipeline.applyTransform(input, new DataflowUnboundedReadFromBoundedSource<>(source))
@@ -2425,7 +2426,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) {
+    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
       PCollectionView<Map<K, V>> view =
           PCollectionViews.mapView(
               input.getPipeline(),
@@ -2470,7 +2471,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
       PCollectionView<Map<K, Iterable<V>>> view =
           PCollectionViews.multimapView(
               input.getPipeline(),
@@ -2511,7 +2512,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     public StreamingViewAsList(DataflowRunner runner, View.AsList<T> transform) {}
 
     @Override
-    public PCollectionView<List<T>> apply(PCollection<T> input) {
+    public PCollectionView<List<T>> expand(PCollection<T> input) {
       PCollectionView<List<T>> view =
           PCollectionViews.listView(
               input.getPipeline(),
@@ -2543,7 +2544,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     public StreamingViewAsIterable(DataflowRunner runner, View.AsIterable<T> transform) { }
 
     @Override
-    public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
+    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
       PCollectionView<Iterable<T>> view =
           PCollectionViews.iterableView(
               input.getPipeline(),
@@ -2586,7 +2587,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollectionView<T> apply(PCollection<T> input) {
+    public PCollectionView<T> expand(PCollection<T> input) {
       Combine.Globally<T, T> combine = Combine.globally(
           new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
       if (!transform.hasDefaultValue()) {
@@ -2644,7 +2645,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollectionView<OutputT> apply(PCollection<InputT> input) {
+    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
       PCollection<OutputT> combined =
           input.apply(Combine.<InputT, OutputT>globally(transform.getCombineFn())
               .withoutDefaults()
@@ -2770,7 +2771,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
 
     @Override
-    public OutputT apply(InputT input) {
+    public OutputT expand(InputT input) {
       String mode = input.getPipeline().getOptions().as(StreamingOptions.class).isStreaming()
           ? "streaming" : "batch";
       String name =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
index 62d4aff..68ee7bc 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
@@ -53,7 +53,7 @@ public class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>>
   }
 
   @Override
-  public PCollection<T> apply(PCollection<T> input) {
+  public PCollection<T> expand(PCollection<T> input) {
     WindowingStrategy<?, ?> outputStrategy =
         transform.getOutputStrategyInternal(input.getWindowingStrategy());
     if (transform.getWindowFn() != null) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
index 96a35bc..e1eedd8 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
@@ -93,7 +93,7 @@ public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin
   }
 
   @Override
-  public PCollection<T> apply(PBegin input) {
+  public PCollection<T> expand(PBegin input) {
     return input.getPipeline().apply(
         Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 95c7132..ac4f2df 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -551,7 +551,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     }
 
     @Override
-    public PCollection<String> apply(PCollection<String> input) {
+    public PCollection<String> expand(PCollection<String> input) {
       return PCollection.createPrimitiveOutputInternal(
           input.getPipeline(),
           WindowingStrategy.globalDefault(),
@@ -585,7 +585,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
       extends PTransform<PCollection<Integer>, PCollection<Integer>> {
 
     @Override
-    public PCollection<Integer> apply(PCollection<Integer> input) {
+    public PCollection<Integer> expand(PCollection<Integer> input) {
       // Apply an operation so that this is a composite transform.
       input.apply(Count.<Integer>perElement());
 
@@ -606,7 +606,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
       extends PTransform<PCollection<Integer>, PDone> {
 
     @Override
-    public PDone apply(PCollection<Integer> input) {
+    public PDone expand(PCollection<Integer> input) {
       // Apply an operation so that this is a composite transform.
       input.apply(Count.<Integer>perElement());
 
@@ -631,7 +631,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     public final TupleTag<Void> doneTag = new TupleTag<>("done");
 
     @Override
-    public PCollectionTuple apply(PCollection<Integer> input) {
+    public PCollectionTuple expand(PCollection<Integer> input) {
       PCollection<Integer> sum = input.apply(Sum.integersGlobally());
 
       // Fails here when attempting to construct a tuple with an unbound object.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 5375c95..1959be5 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -840,7 +840,7 @@ public class DataflowRunnerTest {
     public boolean translated = false;
 
     @Override
-    public PCollection<Integer> apply(PCollection<Integer> input) {
+    public PCollection<Integer> expand(PCollection<Integer> input) {
       return PCollection.<Integer>createPrimitiveOutputInternal(
           input.getPipeline(),
           WindowingStrategy.globalDefault(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
index 67408ae..c9c7806 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
@@ -104,7 +104,7 @@ public class DataflowGroupByKeyTest {
         p.apply(
             new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
               @Override
-              public PCollection<KV<String, Integer>> apply(PBegin input) {
+              public PCollection<KV<String, Integer>> expand(PBegin input) {
                 return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
                         input.getPipeline(),
                         WindowingStrategy.globalDefault(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
index b9220af..4558683 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
@@ -93,7 +93,7 @@ public class DataflowViewTest {
         .apply(
             new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
               @Override
-              public PCollection<KV<String, Integer>> apply(PBegin input) {
+              public PCollection<KV<String, Integer>> expand(PBegin input) {
                 return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
                         input.getPipeline(),
                         WindowingStrategy.globalDefault(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index d7e5207..b2672b5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -85,7 +85,7 @@ public class WordCount {
   public static class CountWords extends PTransform<PCollection<String>,
       PCollection<KV<String, Long>>> {
     @Override
-    public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
+    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
 
       // Convert lines of text into individual words.
       PCollection<String> words = lines.apply(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
index b1c567c..0a56633 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
@@ -62,7 +62,7 @@ public final class ConsoleIO {
       }
 
       @Override
-      public PDone apply(PCollection<T> input) {
+      public PDone expand(PCollection<T> input) {
         return PDone.in(input.getPipeline());
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
index a08c54e..7ebba90 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
@@ -63,7 +63,7 @@ public final class CreateStream<T> {
     }
 
     @Override
-    public PCollection<T> apply(PBegin input) {
+    public PCollection<T> expand(PBegin input) {
       // Spark streaming micro batches are bounded by default
       return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
           WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
index 042c316..f2457ce 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
@@ -94,7 +94,7 @@ public final class HadoopIO {
       }
 
       @Override
-      public PCollection<KV<K, V>> apply(PBegin input) {
+      public PCollection<KV<K, V>> expand(PBegin input) {
         return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
             WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED);
       }
@@ -197,7 +197,7 @@ public final class HadoopIO {
       }
 
       @Override
-      public PDone apply(PCollection<KV<K, V>> input) {
+      public PDone expand(PCollection<KV<K, V>> input) {
         checkNotNull(
             filenamePrefix, "need to set the filename prefix of an HadoopIO.Write transform");
         checkNotNull(formatClass, "need to set the format class of an HadoopIO.Write transform");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
index 6944dbf..30b51e6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.values.PCollection;
 public final class StorageLevelPTransform extends PTransform<PCollection<?>, PCollection<String>> {
 
   @Override
-  public PCollection<String> apply(PCollection<?> input) {
+  public PCollection<String> expand(PCollection<?> input) {
     return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
         WindowingStrategy.globalDefault(),
         PCollection.IsBounded.BOUNDED);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
index 654614a..7580da7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
@@ -35,7 +35,7 @@ public class SinglePrimitiveOutputPTransform<T> extends PTransform<PInput, PColl
   }
 
   @Override
-  public PCollection<T> apply(PInput input) {
+  public PCollection<T> expand(PInput input) {
     try {
       PCollection<T> collection = PCollection.<T>createPrimitiveOutputInternal(
               input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.BOUNDED);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index bd0c655..01a4cba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -281,7 +281,7 @@ public class AvroIO {
       }
 
       @Override
-      public PCollection<T> apply(PBegin input) {
+      public PCollection<T> expand(PBegin input) {
         if (filepattern == null) {
           throw new IllegalStateException(
               "need to set the filepattern of an AvroIO.Read transform");
@@ -795,7 +795,7 @@ public class AvroIO {
       }
 
       @Override
-      public PDone apply(PCollection<T> input) {
+      public PDone expand(PCollection<T> input) {
         if (filenamePrefix == null) {
           throw new IllegalStateException(
               "need to set the filename prefix of an AvroIO.Write transform");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index 123dca8..f2ef358 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -84,7 +84,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
   }
 
   @Override
-  public PCollection<T> apply(PBegin input) {
+  public PCollection<T> expand(PBegin input) {
     PCollection<ValueWithRecordId<T>> read = Pipeline.applyTransform(input,
         Read.from(new UnboundedToBoundedSourceAdapter<>(source, maxNumRecords, maxReadTime)));
     if (source.requiresDeduping()) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
index 456d291..3148d8d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
@@ -130,7 +130,7 @@ public class CountingInput {
     }
 
     @Override
-    public PCollection<Long> apply(PBegin begin) {
+    public PCollection<Long> expand(PBegin begin) {
       return begin.apply(Read.from(CountingSource.createSourceForSubrange(startIndex, endIndex)));
     }
 
@@ -236,7 +236,7 @@ public class CountingInput {
 
     @SuppressWarnings("deprecation")
     @Override
-    public PCollection<Long> apply(PBegin begin) {
+    public PCollection<Long> expand(PBegin begin) {
       Unbounded<Long> read =
           Read.from(
               CountingSource.createUnbounded()