You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/08 17:47:02 UTC

[3/3] incubator-beam git commit: Rename PTransform.apply to PTransform.expand

Rename PTransform.apply to PTransform.expand


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4d607b5a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4d607b5a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4d607b5a

Branch: refs/heads/master
Commit: 4d607b5a594bbf2be76626200d989a9e65ba3da9
Parents: 04a41ee
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 7 13:33:04 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:35:36 2016 -0800

----------------------------------------------------------------------
 .../org/apache/beam/examples/WordCount.java     |  2 +-
 .../beam/examples/complete/AutoComplete.java    |  6 +--
 .../apache/beam/examples/complete/TfIdf.java    |  6 +--
 .../examples/complete/TopWikipediaSessions.java |  6 +--
 .../examples/complete/TrafficMaxLaneFlow.java   |  4 +-
 .../beam/examples/complete/TrafficRoutes.java   |  4 +-
 .../examples/cookbook/BigQueryTornadoes.java    |  2 +-
 .../cookbook/CombinePerKeyExamples.java         |  2 +-
 .../beam/examples/cookbook/FilterExamples.java  |  2 +-
 .../examples/cookbook/MaxPerKeyExamples.java    |  2 +-
 .../beam/examples/cookbook/TriggerExample.java  |  4 +-
 .../examples/complete/AutoCompleteTest.java     |  2 +-
 .../beam/examples/complete/game/GameStats.java  |  2 +-
 .../examples/complete/game/LeaderBoard.java     |  4 +-
 .../beam/examples/complete/game/UserScore.java  |  2 +-
 .../complete/game/utils/WriteToBigQuery.java    |  2 +-
 .../game/utils/WriteWindowedToBigQuery.java     |  2 +-
 .../apache/beam/runners/apex/ApexRunner.java    | 10 ++--
 .../apache/beam/runners/core/AssignWindows.java |  2 +-
 .../core/GroupByKeyViaGroupByKeyOnly.java       |  8 +--
 .../beam/runners/core/SplittableParDo.java      |  6 +--
 .../core/UnboundedReadFromBoundedSource.java    |  2 +-
 .../beam/runners/direct/DirectGroupByKey.java   |  6 +--
 .../runners/direct/ForwardingPTransform.java    |  6 +--
 .../direct/ParDoMultiOverrideFactory.java       |  4 +-
 .../ParDoSingleViaMultiOverrideFactory.java     |  2 +-
 .../direct/TestStreamEvaluatorFactory.java      |  2 +-
 .../runners/direct/ViewEvaluatorFactory.java    |  4 +-
 .../direct/WriteWithShardingFactory.java        |  4 +-
 .../runners/direct/CommittedResultTest.java     |  2 +-
 .../runners/direct/DirectGraphVisitorTest.java  |  2 +-
 .../direct/ForwardingPTransformTest.java        |  4 +-
 .../direct/KeyedPValueTrackingVisitorTest.java  |  4 +-
 .../beam/runners/flink/examples/TFIDF.java      |  6 +--
 .../beam/runners/flink/examples/WordCount.java  |  2 +-
 .../flink/examples/streaming/AutoComplete.java  |  6 +--
 .../apache/beam/runners/flink/FlinkRunner.java  | 14 +++---
 .../beam/runners/dataflow/DataflowRunner.java   | 51 ++++++++++----------
 .../dataflow/internal/AssignWindows.java        |  2 +-
 .../DataflowUnboundedReadFromBoundedSource.java |  2 +-
 .../DataflowPipelineTranslatorTest.java         |  8 +--
 .../runners/dataflow/DataflowRunnerTest.java    |  2 +-
 .../transforms/DataflowGroupByKeyTest.java      |  2 +-
 .../dataflow/transforms/DataflowViewTest.java   |  2 +-
 .../beam/runners/spark/examples/WordCount.java  |  2 +-
 .../apache/beam/runners/spark/io/ConsoleIO.java |  2 +-
 .../beam/runners/spark/io/CreateStream.java     |  2 +-
 .../beam/runners/spark/io/hadoop/HadoopIO.java  |  4 +-
 .../translation/StorageLevelPTransform.java     |  2 +-
 .../util/SinglePrimitiveOutputPTransform.java   |  2 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     |  4 +-
 .../sdk/io/BoundedReadFromUnboundedSource.java  |  2 +-
 .../org/apache/beam/sdk/io/CountingInput.java   |  4 +-
 .../java/org/apache/beam/sdk/io/PubsubIO.java   |  4 +-
 .../apache/beam/sdk/io/PubsubUnboundedSink.java |  2 +-
 .../beam/sdk/io/PubsubUnboundedSource.java      |  2 +-
 .../main/java/org/apache/beam/sdk/io/Read.java  |  4 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     |  4 +-
 .../main/java/org/apache/beam/sdk/io/Write.java |  2 +-
 .../apache/beam/sdk/runners/PipelineRunner.java |  2 +-
 .../apache/beam/sdk/testing/GatherAllPanes.java |  2 +-
 .../org/apache/beam/sdk/testing/PAssert.java    | 12 ++---
 .../org/apache/beam/sdk/testing/TestStream.java |  2 +-
 .../beam/sdk/transforms/ApproximateUnique.java  |  4 +-
 .../org/apache/beam/sdk/transforms/Combine.java | 10 ++--
 .../org/apache/beam/sdk/transforms/Count.java   |  2 +-
 .../org/apache/beam/sdk/transforms/Create.java  |  4 +-
 .../apache/beam/sdk/transforms/Distinct.java    |  4 +-
 .../org/apache/beam/sdk/transforms/Filter.java  |  2 +-
 .../beam/sdk/transforms/FlatMapElements.java    |  2 +-
 .../org/apache/beam/sdk/transforms/Flatten.java |  4 +-
 .../apache/beam/sdk/transforms/GroupByKey.java  |  2 +-
 .../org/apache/beam/sdk/transforms/Keys.java    |  2 +-
 .../org/apache/beam/sdk/transforms/KvSwap.java  |  2 +-
 .../org/apache/beam/sdk/transforms/Latest.java  |  4 +-
 .../apache/beam/sdk/transforms/MapElements.java |  2 +-
 .../apache/beam/sdk/transforms/PTransform.java  |  4 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |  4 +-
 .../apache/beam/sdk/transforms/Partition.java   |  2 +-
 .../org/apache/beam/sdk/transforms/Regex.java   | 14 +++---
 .../org/apache/beam/sdk/transforms/Sample.java  |  2 +-
 .../org/apache/beam/sdk/transforms/Values.java  |  2 +-
 .../org/apache/beam/sdk/transforms/View.java    | 12 ++---
 .../apache/beam/sdk/transforms/WithKeys.java    |  2 +-
 .../beam/sdk/transforms/WithTimestamps.java     |  2 +-
 .../beam/sdk/transforms/join/CoGroupByKey.java  |  2 +-
 .../beam/sdk/transforms/windowing/Window.java   |  4 +-
 .../org/apache/beam/sdk/util/Reshuffle.java     |  2 +-
 .../java/org/apache/beam/sdk/PipelineTest.java  |  6 +--
 .../beam/sdk/coders/CoderRegistryTest.java      |  4 +-
 .../java/org/apache/beam/sdk/io/WriteTest.java  |  2 +-
 .../sdk/runners/TransformHierarchyTest.java     |  4 +-
 .../beam/sdk/runners/TransformTreeTest.java     |  4 +-
 .../sdk/transforms/FlatMapElementsTest.java     |  2 +-
 .../beam/sdk/transforms/GroupByKeyTest.java     |  2 +-
 .../beam/sdk/transforms/MapElementsTest.java    |  2 +-
 .../beam/sdk/transforms/PTransformTest.java     |  2 +-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  4 +-
 .../apache/beam/sdk/transforms/ViewTest.java    |  2 +-
 .../display/DisplayDataEvaluatorTest.java       |  2 +-
 .../display/DisplayDataMatchersTest.java        |  6 +--
 .../sdk/transforms/display/DisplayDataTest.java |  2 +-
 .../sdk/transforms/windowing/WindowingTest.java |  2 +-
 .../apache/beam/sdk/util/StringUtilsTest.java   |  6 +--
 .../org/apache/beam/sdk/values/PDoneTest.java   |  4 +-
 .../beam/sdk/extensions/sorter/SortValues.java  |  2 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  8 +--
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    |  4 +-
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  |  4 +-
 .../org/apache/beam/sdk/io/jdbc/JdbcIO.java     |  4 +-
 .../java/org/apache/beam/sdk/io/jms/JmsIO.java  |  4 +-
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 10 ++--
 .../beam/sdk/io/mongodb/MongoDbGridFSIO.java    |  4 +-
 .../apache/beam/sdk/io/mongodb/MongoDbIO.java   |  4 +-
 114 files changed, 238 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index 5be0ddc..d4da542 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -126,7 +126,7 @@ public class WordCount {
   public static class CountWords extends PTransform<PCollection<String>,
       PCollection<KV<String, Long>>> {
     @Override
-    public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
+    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
 
       // Convert lines of text into individual words.
       PCollection<String> words = lines.apply(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index c556e3f..31b06c9 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -113,7 +113,7 @@ public class AutoComplete {
     }
 
     @Override
-    public PCollection<KV<String, List<CompletionCandidate>>> apply(PCollection<String> input) {
+    public PCollection<KV<String, List<CompletionCandidate>>> expand(PCollection<String> input) {
       PCollection<CompletionCandidate> candidates = input
         // First count how often each token appears.
         .apply(new Count.PerElement<String>())
@@ -154,7 +154,7 @@ public class AutoComplete {
     }
 
     @Override
-    public PCollection<KV<String, List<CompletionCandidate>>> apply(
+    public PCollection<KV<String, List<CompletionCandidate>>> expand(
         PCollection<CompletionCandidate> input) {
       return input
         // For each completion candidate, map it to all prefixes.
@@ -209,7 +209,7 @@ public class AutoComplete {
     }
 
     @Override
-    public PCollectionList<KV<String, List<CompletionCandidate>>> apply(
+    public PCollectionList<KV<String, List<CompletionCandidate>>> expand(
           PCollection<CompletionCandidate> input) {
         if (minPrefix > 10) {
           // Base case, partitioning to return the output in the expected format.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index edf48e7..ea015ae 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -159,7 +159,7 @@ public class TfIdf {
     }
 
     @Override
-    public PCollection<KV<URI, String>> apply(PBegin input) {
+    public PCollection<KV<URI, String>> expand(PBegin input) {
       Pipeline pipeline = input.getPipeline();
 
       // Create one TextIO.Read transform for each document
@@ -200,7 +200,7 @@ public class TfIdf {
     public ComputeTfIdf() { }
 
     @Override
-    public PCollection<KV<String, KV<URI, Double>>> apply(
+    public PCollection<KV<String, KV<URI, Double>>> expand(
       PCollection<KV<URI, String>> uriToContent) {
 
       // Compute the total number of documents, and
@@ -390,7 +390,7 @@ public class TfIdf {
     }
 
     @Override
-    public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
+    public PDone expand(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
       return wordToUriAndTfIdf
           .apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() {
             @ProcessElement

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
index d57cc3a..df7f81e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -99,7 +99,7 @@ public class TopWikipediaSessions {
   static class ComputeSessions
       extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
     @Override
-    public PCollection<KV<String, Long>> apply(PCollection<String> actions) {
+    public PCollection<KV<String, Long>> expand(PCollection<String> actions) {
       return actions
           .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardHours(1))))
 
@@ -113,7 +113,7 @@ public class TopWikipediaSessions {
   private static class TopPerMonth
       extends PTransform<PCollection<KV<String, Long>>, PCollection<List<KV<String, Long>>>> {
     @Override
-    public PCollection<List<KV<String, Long>>> apply(PCollection<KV<String, Long>> sessions) {
+    public PCollection<List<KV<String, Long>>> expand(PCollection<KV<String, Long>> sessions) {
       return sessions
         .apply(Window.<KV<String, Long>>into(CalendarWindows.months(1)))
 
@@ -154,7 +154,7 @@ public class TopWikipediaSessions {
     }
 
     @Override
-    public PCollection<String> apply(PCollection<TableRow> input) {
+    public PCollection<String> expand(PCollection<TableRow> input) {
       return input
           .apply(ParDo.of(new ExtractUserAndTimestamp()))
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
index 0c367d4..c1032b9 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
@@ -267,7 +267,7 @@ public class TrafficMaxLaneFlow {
   static class MaxLaneFlow
       extends PTransform<PCollection<KV<String, LaneInfo>>, PCollection<TableRow>> {
     @Override
-    public PCollection<TableRow> apply(PCollection<KV<String, LaneInfo>> flowInfo) {
+    public PCollection<TableRow> expand(PCollection<KV<String, LaneInfo>> flowInfo) {
       // stationId, LaneInfo => stationId + max lane flow info
       PCollection<KV<String, LaneInfo>> flowMaxes =
           flowInfo.apply(Combine.<String, LaneInfo>perKey(
@@ -289,7 +289,7 @@ public class TrafficMaxLaneFlow {
     }
 
     @Override
-    public PCollection<String> apply(PBegin begin) {
+    public PCollection<String> expand(PBegin begin) {
       return begin
           .apply(TextIO.Read.from(inputFile))
           .apply(ParDo.of(new ExtractTimestamps()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
index 14cee4d..9b5d577 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
@@ -274,7 +274,7 @@ public class TrafficRoutes {
   static class TrackSpeed extends
       PTransform<PCollection<KV<String, StationSpeed>>, PCollection<TableRow>> {
     @Override
-    public PCollection<TableRow> apply(PCollection<KV<String, StationSpeed>> stationSpeed) {
+    public PCollection<TableRow> expand(PCollection<KV<String, StationSpeed>> stationSpeed) {
       // Apply a GroupByKey transform to collect a list of all station
       // readings for a given route.
       PCollection<KV<String, Iterable<StationSpeed>>> timeGroup = stationSpeed.apply(
@@ -299,7 +299,7 @@ public class TrafficRoutes {
     }
 
     @Override
-    public PCollection<String> apply(PBegin begin) {
+    public PCollection<String> expand(PBegin begin) {
       return begin
           .apply(TextIO.Read.from(inputFile))
           .apply(ParDo.of(new ExtractTimestamps()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
index a4c1a6b..14d0f58 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
@@ -108,7 +108,7 @@ public class BigQueryTornadoes {
   static class CountTornadoes
       extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
     @Override
-    public PCollection<TableRow> apply(PCollection<TableRow> rows) {
+    public PCollection<TableRow> expand(PCollection<TableRow> rows) {
 
       // row... => month...
       PCollection<Integer> tornadoes = rows.apply(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index 93eee15..29655ea 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -125,7 +125,7 @@ public class CombinePerKeyExamples {
   static class PlaysForWord
       extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
     @Override
-    public PCollection<TableRow> apply(PCollection<TableRow> rows) {
+    public PCollection<TableRow> expand(PCollection<TableRow> rows) {
 
       // row... => <word, play_name> ...
       PCollection<KV<String, String>> words = rows.apply(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
index 6e6452c..fb6b507 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
@@ -153,7 +153,7 @@ public class FilterExamples {
 
 
     @Override
-    public PCollection<TableRow> apply(PCollection<TableRow> rows) {
+    public PCollection<TableRow> expand(PCollection<TableRow> rows) {
 
       // Extract the mean_temp from each row.
       PCollection<Double> meanTemps = rows.apply(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
index abc10f3..eabc42b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
@@ -100,7 +100,7 @@ public class MaxPerKeyExamples {
   static class MaxMeanTemp
       extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
     @Override
-    public PCollection<TableRow> apply(PCollection<TableRow> rows) {
+    public PCollection<TableRow> expand(PCollection<TableRow> rows) {
 
       // row... => <month, mean_temp> ...
       PCollection<KV<Integer, Double>> temps = rows.apply(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index d965d4a..bf3afca 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -171,7 +171,7 @@ public class TriggerExample {
     }
 
     @Override
-    public PCollectionList<TableRow> apply(PCollection<KV<String, Integer>> flowInfo) {
+    public PCollectionList<TableRow> expand(PCollection<KV<String, Integer>> flowInfo) {
 
       // Concept #1: The default triggering behavior
       // By default Beam uses a trigger which fires when the watermark has passed the end of the
@@ -332,7 +332,7 @@ public class TriggerExample {
     }
 
     @Override
-    public PCollection<TableRow> apply(PCollection<KV<String, Integer>> flowInfo) {
+    public PCollection<TableRow> expand(PCollection<KV<String, Integer>> flowInfo) {
       PCollection<KV<String, Iterable<Integer>>> flowPerFreeway = flowInfo
           .apply(GroupByKey.<String, Integer>create());
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
index 5dbfa70..d7d4dc6 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
@@ -168,7 +168,7 @@ public class AutoCompleteTest implements Serializable {
   private static class ReifyTimestamps<T>
       extends PTransform<PCollection<TimestampedValue<T>>, PCollection<T>> {
     @Override
-    public PCollection<T> apply(PCollection<TimestampedValue<T>> input) {
+    public PCollection<T> expand(PCollection<TimestampedValue<T>> input) {
       return input.apply(ParDo.of(new DoFn<TimestampedValue<T>, T>() {
         @ProcessElement
         public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index f352252..6ad6a23 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -110,7 +110,7 @@ public class GameStats extends LeaderBoard {
     private static final double SCORE_WEIGHT = 2.5;
 
     @Override
-    public PCollection<KV<String, Integer>> apply(PCollection<KV<String, Integer>> userScores) {
+    public PCollection<KV<String, Integer>> expand(PCollection<KV<String, Integer>> userScores) {
 
       // Get the sum of scores for each user.
       PCollection<KV<String, Integer>> sumScores = userScores

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 3275fa0..519bd5f 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -234,7 +234,7 @@ public class LeaderBoard extends HourlyTeamScore {
     }
 
     @Override
-    public PCollection<KV<String, Integer>> apply(PCollection<GameActionInfo> infos) {
+    public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> infos) {
       return infos.apply("LeaderboardTeamFixedWindows",
           Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))
               // We will get early (speculative) results as well as cumulative
@@ -267,7 +267,7 @@ public class LeaderBoard extends HourlyTeamScore {
     }
 
     @Override
-    public PCollection<KV<String, Integer>> apply(PCollection<GameActionInfo> input) {
+    public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> input) {
       return input.apply("LeaderboardUserGlobalWindow",
           Window.<GameActionInfo>into(new GlobalWindows())
               // Get periodic results every ten minutes.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index 2bca7fc..cb81a7e 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -160,7 +160,7 @@ public class UserScore {
     }
 
     @Override
-    public PCollection<KV<String, Integer>> apply(
+    public PCollection<KV<String, Integer>> expand(
         PCollection<GameActionInfo> gameInfo) {
 
       return gameInfo

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
index 89fc271..1f33915 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
@@ -118,7 +118,7 @@ public class WriteToBigQuery<InputT>
   }
 
   @Override
-  public PDone apply(PCollection<InputT> teamAndScore) {
+  public PDone expand(PCollection<InputT> teamAndScore) {
     return teamAndScore
       .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
       .apply(BigQueryIO.Write

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
index 4f2e719..c32289f 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
@@ -60,7 +60,7 @@ public class WriteWindowedToBigQuery<T>
   }
 
   @Override
-  public PDone apply(PCollection<T> teamAndScore) {
+  public PDone expand(PCollection<T> teamAndScore) {
     return teamAndScore
       .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
       .apply(BigQueryIO.Write

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 5ce4fef..9507fb9 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -165,7 +165,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     }
 
     @Override
-    public PCollection<T> apply(PCollection<T> input) {
+    public PCollection<T> expand(PCollection<T> input) {
       WindowingStrategy<?, ?> outputStrategy =
           wrapped.getOutputStrategyInternal(input.getWindowingStrategy());
 
@@ -226,7 +226,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     }
 
     @Override
-    public PCollectionView<ViewT> apply(PCollection<List<ElemT>> input) {
+    public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
       return view;
     }
   }
@@ -252,7 +252,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     }
 
     @Override
-    public PCollectionView<OutputT> apply(PCollection<InputT> input) {
+    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
       PCollection<OutputT> combined = input
           .apply(Combine.globally(transform.getCombineFn())
               .withoutDefaults().withFanout(transform.getFanout()));
@@ -282,7 +282,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     }
 
     @Override
-    public PCollectionView<T> apply(PCollection<T> input) {
+    public PCollectionView<T> expand(PCollection<T> input) {
       Combine.Globally<T, T> combine = Combine
           .globally(new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
       if (!transform.hasDefaultValue()) {
@@ -335,7 +335,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     }
 
     @Override
-    public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
+    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
       PCollectionView<Iterable<T>> view = PCollectionViews.iterableView(input.getPipeline(),
           input.getWindowingStrategy(), input.getCoder());
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
index f2387f5..375932a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
@@ -40,7 +40,7 @@ public class AssignWindows<T, W extends BoundedWindow>
   }
 
   @Override
-  public PCollection<T> apply(PCollection<T> input) {
+  public PCollection<T> expand(PCollection<T> input) {
     return input.apply("AssignWindows", ParDo.of(new AssignWindowsDoFn<>(fn)));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
index 43047ca..694c5eb 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
@@ -77,7 +77,7 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
   }
 
   @Override
-  public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+  public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
     WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
 
     return input
@@ -109,7 +109,7 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     @Override
-    public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(PCollection<KV<K, V>> input) {
+    public PCollection<KV<K, Iterable<WindowedValue<V>>>> expand(PCollection<KV<K, V>> input) {
       return PCollection.createPrimitiveOutputInternal(
           input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
     }
@@ -128,7 +128,7 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
           PCollection<KV<K, Iterable<WindowedValue<V>>>>,
           PCollection<KV<K, Iterable<WindowedValue<V>>>>> {
     @Override
-    public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(
+    public PCollection<KV<K, Iterable<WindowedValue<V>>>> expand(
         PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
       return input
           .apply(
@@ -225,7 +225,7 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
     }
 
     @Override
-    public PCollection<KV<K, Iterable<V>>> apply(
+    public PCollection<KV<K, Iterable<V>>> expand(
         PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
       @SuppressWarnings("unchecked")
       KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 580e842..0bf882b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -110,7 +110,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
   }
 
   @Override
-  public PCollectionTuple apply(PCollection<InputT> input) {
+  public PCollectionTuple expand(PCollection<InputT> input) {
     return applyTyped(input);
   }
 
@@ -179,7 +179,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
   public static class GBKIntoKeyedWorkItems<KeyT, InputT>
       extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
     @Override
-    public PCollection<KeyedWorkItem<KeyT, InputT>> apply(PCollection<KV<KeyT, InputT>> input) {
+    public PCollection<KeyedWorkItem<KeyT, InputT>> expand(PCollection<KV<KeyT, InputT>> input) {
       return PCollection.createPrimitiveOutputInternal(
           input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded());
     }
@@ -247,7 +247,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     }
 
     @Override
-    public PCollectionTuple apply(
+    public PCollectionTuple expand(
         PCollection<? extends KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
             input) {
       DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
index 29dc57e..f3f93e1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
@@ -88,7 +88,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
   }
 
   @Override
-  public PCollection<T> apply(PBegin input) {
+  public PCollection<T> expand(PBegin input) {
     return input.getPipeline().apply(
         Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 21776e7..405d913 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -45,7 +45,7 @@ class DirectGroupByKey<K, V>
   }
 
   @Override
-  public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+  public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
     @SuppressWarnings("unchecked")
     KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
 
@@ -79,7 +79,7 @@ class DirectGroupByKey<K, V>
   static final class DirectGroupByKeyOnly<K, V>
       extends PTransform<PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>> {
     @Override
-    public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, V>> input) {
+    public PCollection<KeyedWorkItem<K, V>> expand(PCollection<KV<K, V>> input) {
       return PCollection.createPrimitiveOutputInternal(
           input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded());
     }
@@ -126,7 +126,7 @@ class DirectGroupByKey<K, V>
     }
 
     @Override
-    public PCollection<KV<K, Iterable<V>>> apply(PCollection<KeyedWorkItem<K, V>> input) {
+    public PCollection<KV<K, Iterable<V>>> expand(PCollection<KeyedWorkItem<K, V>> input) {
       return PCollection.createPrimitiveOutputInternal(
           input.getPipeline(), outputWindowingStrategy, input.isBounded());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
index 77311c2..97c0983 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
@@ -28,15 +28,15 @@ import org.apache.beam.sdk.values.TypedPValue;
 /**
  * A base class for implementing {@link PTransform} overrides, which behave identically to the
  * delegate transform but with overridden methods. Implementors are required to implement
- * {@link #delegate()}, which returns the object to forward calls to, and {@link #apply(PInput)}.
+ * {@link #delegate()}, which returns the object to forward calls to, and {@link #expand(PInput)}.
  */
 public abstract class ForwardingPTransform<InputT extends PInput, OutputT extends POutput>
     extends PTransform<InputT, OutputT> {
   protected abstract PTransform<InputT, OutputT> delegate();
 
   @Override
-  public OutputT apply(InputT input) {
-    return delegate().apply(input);
+  public OutputT expand(InputT input) {
+    return delegate().expand(input);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 9c9256d..8c96e9b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -73,7 +73,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
     }
 
     @Override
-    public PCollectionTuple apply(PCollection<KV<K, InputT>> input) {
+    public PCollectionTuple expand(PCollection<KV<K, InputT>> input) {
 
       PCollectionTuple outputs = input
           .apply("Group by key", GroupByKey.<K, InputT>create())
@@ -106,7 +106,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
       return underlyingParDo.getDefaultOutputCoder(originalInput, output);
     }
 
-    public PCollectionTuple apply(PCollection<? extends KV<K, Iterable<InputT>>> input) {
+    public PCollectionTuple expand(PCollection<? extends KV<K, Iterable<InputT>>> input) {
 
       PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal(
           input.getPipeline(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
index 7f2de66..10530bb 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
@@ -49,7 +49,7 @@ class ParDoSingleViaMultiOverrideFactory<InputT, OutputT>
     }
 
     @Override
-    public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
+    public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
 
       // Output tags for ParDo need only be unique up to applied transform
       TupleTag<OutputT> mainOutputTag = new TupleTag<OutputT>(MAIN_OUTPUT_TAG);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 9df7cdc..3601dbc 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -170,7 +170,7 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
       }
 
       @Override
-      public PCollection<T> apply(PBegin input) {
+      public PCollection<T> expand(PBegin input) {
         PipelineRunner<?> runner = input.getPipeline().getRunner();
         checkState(
             runner instanceof DirectRunner,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index b92ade1..460b1c2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -115,7 +115,7 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
     }
 
     @Override
-    public PCollectionView<ViewT> apply(PCollection<ElemT> input) {
+    public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
       return input.apply(WithKeys.<Void, ElemT>of((Void) null))
           .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()))
           .apply(GroupByKey.<Void, ElemT>create())
@@ -145,7 +145,7 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
     }
 
     @Override
-    public PCollectionView<ViewT> apply(PCollection<Iterable<ElemT>> input) {
+    public PCollectionView<ViewT> expand(PCollection<Iterable<ElemT>> input) {
       return og.getView();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index cf535cf..3c88337 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -66,7 +66,7 @@ class WriteWithShardingFactory<InputT>
     }
 
     @Override
-    public PDone apply(PCollection<T> input) {
+    public PDone expand(PCollection<T> input) {
       checkArgument(IsBounded.BOUNDED == input.isBounded(),
           "%s can only be applied to a Bounded PCollection",
           getClass().getSimpleName());
@@ -92,7 +92,7 @@ class WriteWithShardingFactory<InputT>
       // without adding a new Write Transform Node, which would be overwritten the same way, leading
       // to an infinite recursion. We cannot modify the number of shards, because that is determined
       // at runtime.
-      return original.apply(resharded);
+      return original.expand(resharded);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index 00dca20..c6986c0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -52,7 +52,7 @@ public class CommittedResultTest implements Serializable {
   private transient AppliedPTransform<?, ?, ?> transform =
       AppliedPTransform.of("foo", p.begin(), PDone.in(p), new PTransform<PBegin, PDone>() {
         @Override
-        public PDone apply(PBegin begin) {
+        public PDone expand(PBegin begin) {
           throw new IllegalArgumentException("Should never be applied");
         }
       });

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
index 5ad278b..b88c9a4 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -213,7 +213,7 @@ public class DirectGraphVisitorTest implements Serializable {
         transformed.apply(
             new PTransform<PInput, PDone>() {
               @Override
-              public PDone apply(PInput input) {
+              public PDone expand(PInput input) {
                 return PDone.in(input.getPipeline());
               }
             });

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
index c75adaa..6860a58 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
@@ -66,8 +66,8 @@ public class ForwardingPTransformTest {
     PCollection<Integer> collection = mock(PCollection.class);
     @SuppressWarnings("unchecked")
     PCollection<String> output = mock(PCollection.class);
-    when(delegate.apply(collection)).thenReturn(output);
-    PCollection<String> result = forwarding.apply(collection);
+    when(delegate.expand(collection)).thenReturn(output);
+    PCollection<String> result = forwarding.expand(collection);
     assertThat(result, equalTo(output));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
index cf65936..0852cd3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
@@ -163,7 +163,7 @@ public class KeyedPValueTrackingVisitorTest {
 
   private static class PrimitiveKeyer<K> extends PTransform<PCollection<K>, PCollection<K>> {
     @Override
-    public PCollection<K> apply(PCollection<K> input) {
+    public PCollection<K> expand(PCollection<K> input) {
       return PCollection.<K>createPrimitiveOutputInternal(
               input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
           .setCoder(input.getCoder());
@@ -172,7 +172,7 @@ public class KeyedPValueTrackingVisitorTest {
 
   private static class CompositeKeyer<K> extends PTransform<PCollection<K>, PCollection<K>> {
     @Override
-    public PCollection<K> apply(PCollection<K> input) {
+    public PCollection<K> expand(PCollection<K> input) {
       return input.apply(new PrimitiveKeyer<K>()).apply(ParDo.of(new IdentityFn<K>()));
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index b946d98..89e261b 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -176,7 +176,7 @@ public class TFIDF {
     }
 
     @Override
-    public PCollection<KV<URI, String>> apply(PBegin input) {
+    public PCollection<KV<URI, String>> expand(PBegin input) {
       Pipeline pipeline = input.getPipeline();
 
       // Create one TextIO.Read transform for each document
@@ -219,7 +219,7 @@ public class TFIDF {
     public ComputeTfIdf() { }
 
     @Override
-    public PCollection<KV<String, KV<URI, Double>>> apply(
+    public PCollection<KV<String, KV<URI, Double>>> expand(
         PCollection<KV<URI, String>> uriToContent) {
 
       // Compute the total number of documents, and
@@ -419,7 +419,7 @@ public class TFIDF {
     }
 
     @Override
-    public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
+    public PDone expand(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
       return wordToUriAndTfIdf
           .apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() {
             private static final long serialVersionUID = 0;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
index c816442..b6b3c1a 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
@@ -72,7 +72,7 @@ public class WordCount {
   public static class CountWords extends PTransform<PCollection<String>,
                     PCollection<KV<String, Long>>> {
     @Override
-    public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
+    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
 
       // Convert lines of text into individual words.
       PCollection<String> words = lines.apply(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index 1e0c3ac..3405981 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -82,7 +82,7 @@ public class AutoComplete {
     }
 
     @Override
-    public PCollection<KV<String, List<CompletionCandidate>>> apply(PCollection<String> input) {
+    public PCollection<KV<String, List<CompletionCandidate>>> expand(PCollection<String> input) {
       PCollection<CompletionCandidate> candidates = input
         // First count how often each token appears.
         .apply(new Count.PerElement<String>())
@@ -129,7 +129,7 @@ public class AutoComplete {
     }
 
     @Override
-    public PCollection<KV<String, List<CompletionCandidate>>> apply(
+    public PCollection<KV<String, List<CompletionCandidate>>> expand(
         PCollection<CompletionCandidate> input) {
       return input
         // For each completion candidate, map it to all prefixes.
@@ -192,7 +192,7 @@ public class AutoComplete {
     }
 
     @Override
-    public PCollectionList<KV<String, List<CompletionCandidate>>> apply(
+    public PCollectionList<KV<String, List<CompletionCandidate>>> expand(
           PCollection<CompletionCandidate> input) {
         if (minPrefix > 10) {
           // Base case, partitioning to return the output in the expected format.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 0b92734..7c1284b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -307,7 +307,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
     }
 
     @Override
-    public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) {
+    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
       PCollectionView<Map<K, V>> view =
           PCollectionViews.mapView(
               input.getPipeline(),
@@ -352,7 +352,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
     }
 
     @Override
-    public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
       PCollectionView<Map<K, Iterable<V>>> view =
           PCollectionViews.multimapView(
               input.getPipeline(),
@@ -392,7 +392,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
     public StreamingViewAsList(FlinkRunner runner, View.AsList<T> transform) {}
 
     @Override
-    public PCollectionView<List<T>> apply(PCollection<T> input) {
+    public PCollectionView<List<T>> expand(PCollection<T> input) {
       PCollectionView<List<T>> view =
           PCollectionViews.listView(
               input.getPipeline(),
@@ -423,7 +423,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
     public StreamingViewAsIterable(FlinkRunner runner, View.AsIterable<T> transform) { }
 
     @Override
-    public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
+    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
       PCollectionView<Iterable<T>> view =
           PCollectionViews.iterableView(
               input.getPipeline(),
@@ -465,7 +465,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
     }
 
     @Override
-    public PCollectionView<T> apply(PCollection<T> input) {
+    public PCollectionView<T> expand(PCollection<T> input) {
       Combine.Globally<T, T> combine = Combine.globally(
           new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
       if (!transform.hasDefaultValue()) {
@@ -523,7 +523,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
     }
 
     @Override
-    public PCollectionView<OutputT> apply(PCollection<InputT> input) {
+    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
       PCollection<OutputT> combined =
           input.apply(Combine.globally(transform.getCombineFn())
               .withoutDefaults()
@@ -620,7 +620,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
     }
 
     @Override
-    public PCollectionView<ViewT> apply(PCollection<List<ElemT>> input) {
+    public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
       return view;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 40d8948..00c94d0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -756,7 +756,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollection<KV<K1, Iterable<KV<K2, V>>>> apply(PCollection<KV<K1, KV<K2, V>>> input) {
+    public PCollection<KV<K1, Iterable<KV<K2, V>>>> expand(PCollection<KV<K1, KV<K2, V>>> input) {
       PCollection<KV<K1, Iterable<KV<K2, V>>>> rval =
           PCollection.<KV<K1, Iterable<KV<K2, V>>>>createPrimitiveOutputInternal(
           input.getPipeline(),
@@ -814,7 +814,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollection<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>> apply(PCollection<T> input) {
+    public PCollection<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>> expand(
+        PCollection<T> input) {
       @SuppressWarnings("unchecked")
       Coder<W> windowCoder = (Coder<W>)
           input.getWindowingStrategy().getWindowFn().windowCoder();
@@ -902,7 +903,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollectionView<T> apply(PCollection<T> input) {
+    public PCollectionView<T> expand(PCollection<T> input) {
       @SuppressWarnings("unchecked")
       Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>)
           input.getWindowingStrategy().getWindowFn().windowCoder();
@@ -993,7 +994,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
+    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
       PCollectionView<Iterable<T>> view = PCollectionViews.iterableView(
           input.getPipeline(), input.getWindowingStrategy(), input.getCoder());
       return BatchViewAsList.applyForIterableLike(runner, input, view);
@@ -1097,7 +1098,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollectionView<List<T>> apply(PCollection<T> input) {
+    public PCollectionView<List<T>> expand(PCollection<T> input) {
       PCollectionView<List<T>> view = PCollectionViews.listView(
           input.getPipeline(), input.getWindowingStrategy(), input.getCoder());
       return applyForIterableLike(runner, input, view);
@@ -1265,7 +1266,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) {
+    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
       return this.<BoundedWindow>applyInternal(input);
     }
 
@@ -1406,7 +1407,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
       @Override
       public PCollection<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>>
-          apply(PCollection<KV<K, V>> input) {
+      expand(PCollection<KV<K, V>> input) {
 
         @SuppressWarnings("unchecked")
         Coder<W> windowCoder = (Coder<W>)
@@ -1754,7 +1755,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
       return this.<BoundedWindow>applyInternal(input);
     }
 
@@ -2056,13 +2057,13 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PDone apply(PCollection<T> input) {
+    public PDone expand(PCollection<T> input) {
       if (transform.getSink() instanceof FileBasedSink) {
         FileBasedSink<?> sink = (FileBasedSink<?>) transform.getSink();
         PathValidator validator = runner.options.getPathValidator();
         validator.validateOutputFilePrefixSupported(sink.getBaseOutputFilename());
       }
-      return transform.apply(input);
+      return transform.expand(input);
     }
   }
 
@@ -2071,7 +2072,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   // ================================================================================
 
   /**
-   * Suppress application of {@link PubsubUnboundedSource#apply} in streaming mode so that we
+   * Suppress application of {@link PubsubUnboundedSource#expand} in streaming mode so that we
    * can instead defer to Windmill's implementation.
    */
   private static class StreamingPubsubIORead<T> extends PTransform<PBegin, PCollection<T>> {
@@ -2090,7 +2091,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollection<T> apply(PBegin input) {
+    public PCollection<T> expand(PBegin input) {
       return PCollection.<T>createPrimitiveOutputInternal(
           input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
           .setCoder(transform.getElementCoder());
@@ -2155,7 +2156,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   }
 
   /**
-   * Suppress application of {@link PubsubUnboundedSink#apply} in streaming mode so that we
+   * Suppress application of {@link PubsubUnboundedSink#expand} in streaming mode so that we
    * can instead defer to Windmill's implementation.
    */
   private static class StreamingPubsubIOWrite<T> extends PTransform<PCollection<T>, PDone> {
@@ -2174,7 +2175,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PDone apply(PCollection<T> input) {
+    public PDone expand(PCollection<T> input) {
       return PDone.in(input.getPipeline());
     }
 
@@ -2252,7 +2253,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public final PCollection<T> apply(PInput input) {
+    public final PCollection<T> expand(PInput input) {
       source.validate();
 
       if (source.requiresDeduping()) {
@@ -2277,7 +2278,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       }
 
       @Override
-      public final PCollection<ValueWithRecordId<T>> apply(PInput input) {
+      public final PCollection<ValueWithRecordId<T>> expand(PInput input) {
         return PCollection.<ValueWithRecordId<T>>createPrimitiveOutputInternal(
             input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED);
       }
@@ -2327,7 +2328,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     // more per-key overhead.
     private static final int NUM_RESHARD_KEYS = 10000;
     @Override
-    public PCollection<T> apply(PCollection<ValueWithRecordId<T>> input) {
+    public PCollection<T> expand(PCollection<ValueWithRecordId<T>> input) {
       return input
           .apply(WithKeys.of(new SerializableFunction<ValueWithRecordId<T>, Integer>() {
                     @Override
@@ -2367,7 +2368,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public final PCollection<T> apply(PBegin input) {
+    public final PCollection<T> expand(PBegin input) {
       source.validate();
 
       return Pipeline.applyTransform(input, new DataflowUnboundedReadFromBoundedSource<>(source))
@@ -2425,7 +2426,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) {
+    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
       PCollectionView<Map<K, V>> view =
           PCollectionViews.mapView(
               input.getPipeline(),
@@ -2470,7 +2471,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
       PCollectionView<Map<K, Iterable<V>>> view =
           PCollectionViews.multimapView(
               input.getPipeline(),
@@ -2511,7 +2512,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     public StreamingViewAsList(DataflowRunner runner, View.AsList<T> transform) {}
 
     @Override
-    public PCollectionView<List<T>> apply(PCollection<T> input) {
+    public PCollectionView<List<T>> expand(PCollection<T> input) {
       PCollectionView<List<T>> view =
           PCollectionViews.listView(
               input.getPipeline(),
@@ -2543,7 +2544,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     public StreamingViewAsIterable(DataflowRunner runner, View.AsIterable<T> transform) { }
 
     @Override
-    public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
+    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
       PCollectionView<Iterable<T>> view =
           PCollectionViews.iterableView(
               input.getPipeline(),
@@ -2586,7 +2587,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollectionView<T> apply(PCollection<T> input) {
+    public PCollectionView<T> expand(PCollection<T> input) {
       Combine.Globally<T, T> combine = Combine.globally(
           new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
       if (!transform.hasDefaultValue()) {
@@ -2644,7 +2645,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollectionView<OutputT> apply(PCollection<InputT> input) {
+    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
       PCollection<OutputT> combined =
           input.apply(Combine.<InputT, OutputT>globally(transform.getCombineFn())
               .withoutDefaults()
@@ -2770,7 +2771,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
 
     @Override
-    public OutputT apply(InputT input) {
+    public OutputT expand(InputT input) {
       String mode = input.getPipeline().getOptions().as(StreamingOptions.class).isStreaming()
           ? "streaming" : "batch";
       String name =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
index 62d4aff..68ee7bc 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
@@ -53,7 +53,7 @@ public class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>>
   }
 
   @Override
-  public PCollection<T> apply(PCollection<T> input) {
+  public PCollection<T> expand(PCollection<T> input) {
     WindowingStrategy<?, ?> outputStrategy =
         transform.getOutputStrategyInternal(input.getWindowingStrategy());
     if (transform.getWindowFn() != null) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
index 96a35bc..e1eedd8 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
@@ -93,7 +93,7 @@ public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin
   }
 
   @Override
-  public PCollection<T> apply(PBegin input) {
+  public PCollection<T> expand(PBegin input) {
     return input.getPipeline().apply(
         Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 95c7132..ac4f2df 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -551,7 +551,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     }
 
     @Override
-    public PCollection<String> apply(PCollection<String> input) {
+    public PCollection<String> expand(PCollection<String> input) {
       return PCollection.createPrimitiveOutputInternal(
           input.getPipeline(),
           WindowingStrategy.globalDefault(),
@@ -585,7 +585,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
       extends PTransform<PCollection<Integer>, PCollection<Integer>> {
 
     @Override
-    public PCollection<Integer> apply(PCollection<Integer> input) {
+    public PCollection<Integer> expand(PCollection<Integer> input) {
       // Apply an operation so that this is a composite transform.
       input.apply(Count.<Integer>perElement());
 
@@ -606,7 +606,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
       extends PTransform<PCollection<Integer>, PDone> {
 
     @Override
-    public PDone apply(PCollection<Integer> input) {
+    public PDone expand(PCollection<Integer> input) {
       // Apply an operation so that this is a composite transform.
       input.apply(Count.<Integer>perElement());
 
@@ -631,7 +631,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     public final TupleTag<Void> doneTag = new TupleTag<>("done");
 
     @Override
-    public PCollectionTuple apply(PCollection<Integer> input) {
+    public PCollectionTuple expand(PCollection<Integer> input) {
       PCollection<Integer> sum = input.apply(Sum.integersGlobally());
 
       // Fails here when attempting to construct a tuple with an unbound object.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 5375c95..1959be5 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -840,7 +840,7 @@ public class DataflowRunnerTest {
     public boolean translated = false;
 
     @Override
-    public PCollection<Integer> apply(PCollection<Integer> input) {
+    public PCollection<Integer> expand(PCollection<Integer> input) {
       return PCollection.<Integer>createPrimitiveOutputInternal(
           input.getPipeline(),
           WindowingStrategy.globalDefault(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
index 67408ae..c9c7806 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
@@ -104,7 +104,7 @@ public class DataflowGroupByKeyTest {
         p.apply(
             new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
               @Override
-              public PCollection<KV<String, Integer>> apply(PBegin input) {
+              public PCollection<KV<String, Integer>> expand(PBegin input) {
                 return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
                         input.getPipeline(),
                         WindowingStrategy.globalDefault(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
index b9220af..4558683 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
@@ -93,7 +93,7 @@ public class DataflowViewTest {
         .apply(
             new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
               @Override
-              public PCollection<KV<String, Integer>> apply(PBegin input) {
+              public PCollection<KV<String, Integer>> expand(PBegin input) {
                 return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
                         input.getPipeline(),
                         WindowingStrategy.globalDefault(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index d7e5207..b2672b5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -85,7 +85,7 @@ public class WordCount {
   public static class CountWords extends PTransform<PCollection<String>,
       PCollection<KV<String, Long>>> {
     @Override
-    public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
+    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
 
       // Convert lines of text into individual words.
       PCollection<String> words = lines.apply(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
index b1c567c..0a56633 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
@@ -62,7 +62,7 @@ public final class ConsoleIO {
       }
 
       @Override
-      public PDone apply(PCollection<T> input) {
+      public PDone expand(PCollection<T> input) {
         return PDone.in(input.getPipeline());
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
index a08c54e..7ebba90 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
@@ -63,7 +63,7 @@ public final class CreateStream<T> {
     }
 
     @Override
-    public PCollection<T> apply(PBegin input) {
+    public PCollection<T> expand(PBegin input) {
       // Spark streaming micro batches are bounded by default
       return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
           WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
index 042c316..f2457ce 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
@@ -94,7 +94,7 @@ public final class HadoopIO {
       }
 
       @Override
-      public PCollection<KV<K, V>> apply(PBegin input) {
+      public PCollection<KV<K, V>> expand(PBegin input) {
         return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
             WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED);
       }
@@ -197,7 +197,7 @@ public final class HadoopIO {
       }
 
       @Override
-      public PDone apply(PCollection<KV<K, V>> input) {
+      public PDone expand(PCollection<KV<K, V>> input) {
         checkNotNull(
             filenamePrefix, "need to set the filename prefix of an HadoopIO.Write transform");
         checkNotNull(formatClass, "need to set the format class of an HadoopIO.Write transform");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
index 6944dbf..30b51e6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.values.PCollection;
 public final class StorageLevelPTransform extends PTransform<PCollection<?>, PCollection<String>> {
 
   @Override
-  public PCollection<String> apply(PCollection<?> input) {
+  public PCollection<String> expand(PCollection<?> input) {
     return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
         WindowingStrategy.globalDefault(),
         PCollection.IsBounded.BOUNDED);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
index 654614a..7580da7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
@@ -35,7 +35,7 @@ public class SinglePrimitiveOutputPTransform<T> extends PTransform<PInput, PColl
   }
 
   @Override
-  public PCollection<T> apply(PInput input) {
+  public PCollection<T> expand(PInput input) {
     try {
       PCollection<T> collection = PCollection.<T>createPrimitiveOutputInternal(
               input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.BOUNDED);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index bd0c655..01a4cba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -281,7 +281,7 @@ public class AvroIO {
       }
 
       @Override
-      public PCollection<T> apply(PBegin input) {
+      public PCollection<T> expand(PBegin input) {
         if (filepattern == null) {
           throw new IllegalStateException(
               "need to set the filepattern of an AvroIO.Read transform");
@@ -795,7 +795,7 @@ public class AvroIO {
       }
 
       @Override
-      public PDone apply(PCollection<T> input) {
+      public PDone expand(PCollection<T> input) {
         if (filenamePrefix == null) {
           throw new IllegalStateException(
               "need to set the filename prefix of an AvroIO.Write transform");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index 123dca8..f2ef358 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -84,7 +84,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
   }
 
   @Override
-  public PCollection<T> apply(PBegin input) {
+  public PCollection<T> expand(PBegin input) {
     PCollection<ValueWithRecordId<T>> read = Pipeline.applyTransform(input,
         Read.from(new UnboundedToBoundedSourceAdapter<>(source, maxNumRecords, maxReadTime)));
     if (source.requiresDeduping()) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
index 456d291..3148d8d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
@@ -130,7 +130,7 @@ public class CountingInput {
     }
 
     @Override
-    public PCollection<Long> apply(PBegin begin) {
+    public PCollection<Long> expand(PBegin begin) {
       return begin.apply(Read.from(CountingSource.createSourceForSubrange(startIndex, endIndex)));
     }
 
@@ -236,7 +236,7 @@ public class CountingInput {
 
     @SuppressWarnings("deprecation")
     @Override
-    public PCollection<Long> apply(PBegin begin) {
+    public PCollection<Long> expand(PBegin begin) {
       Unbounded<Long> read =
           Read.from(
               CountingSource.createUnbounded()