You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/08 17:47:00 UTC
[1/3] incubator-beam git commit: This closes #1538
Repository: incubator-beam
Updated Branches:
refs/heads/master 04a41ee54 -> 692905705
This closes #1538
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/69290570
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/69290570
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/69290570
Branch: refs/heads/master
Commit: 692905705fee3666bdbf85e67e5a037244d668b8
Parents: 04a41ee 4d607b5
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Dec 8 09:35:36 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:35:36 2016 -0800
----------------------------------------------------------------------
.../org/apache/beam/examples/WordCount.java | 2 +-
.../beam/examples/complete/AutoComplete.java | 6 +--
.../apache/beam/examples/complete/TfIdf.java | 6 +--
.../examples/complete/TopWikipediaSessions.java | 6 +--
.../examples/complete/TrafficMaxLaneFlow.java | 4 +-
.../beam/examples/complete/TrafficRoutes.java | 4 +-
.../examples/cookbook/BigQueryTornadoes.java | 2 +-
.../cookbook/CombinePerKeyExamples.java | 2 +-
.../beam/examples/cookbook/FilterExamples.java | 2 +-
.../examples/cookbook/MaxPerKeyExamples.java | 2 +-
.../beam/examples/cookbook/TriggerExample.java | 4 +-
.../examples/complete/AutoCompleteTest.java | 2 +-
.../beam/examples/complete/game/GameStats.java | 2 +-
.../examples/complete/game/LeaderBoard.java | 4 +-
.../beam/examples/complete/game/UserScore.java | 2 +-
.../complete/game/utils/WriteToBigQuery.java | 2 +-
.../game/utils/WriteWindowedToBigQuery.java | 2 +-
.../apache/beam/runners/apex/ApexRunner.java | 10 ++--
.../apache/beam/runners/core/AssignWindows.java | 2 +-
.../core/GroupByKeyViaGroupByKeyOnly.java | 8 +--
.../beam/runners/core/SplittableParDo.java | 6 +--
.../core/UnboundedReadFromBoundedSource.java | 2 +-
.../beam/runners/direct/DirectGroupByKey.java | 6 +--
.../runners/direct/ForwardingPTransform.java | 6 +--
.../direct/ParDoMultiOverrideFactory.java | 4 +-
.../ParDoSingleViaMultiOverrideFactory.java | 2 +-
.../direct/TestStreamEvaluatorFactory.java | 2 +-
.../runners/direct/ViewEvaluatorFactory.java | 4 +-
.../direct/WriteWithShardingFactory.java | 4 +-
.../runners/direct/CommittedResultTest.java | 2 +-
.../runners/direct/DirectGraphVisitorTest.java | 2 +-
.../direct/ForwardingPTransformTest.java | 4 +-
.../direct/KeyedPValueTrackingVisitorTest.java | 4 +-
.../beam/runners/flink/examples/TFIDF.java | 6 +--
.../beam/runners/flink/examples/WordCount.java | 2 +-
.../flink/examples/streaming/AutoComplete.java | 6 +--
.../apache/beam/runners/flink/FlinkRunner.java | 14 +++---
.../beam/runners/dataflow/DataflowRunner.java | 51 ++++++++++----------
.../dataflow/internal/AssignWindows.java | 2 +-
.../DataflowUnboundedReadFromBoundedSource.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 8 +--
.../runners/dataflow/DataflowRunnerTest.java | 2 +-
.../transforms/DataflowGroupByKeyTest.java | 2 +-
.../dataflow/transforms/DataflowViewTest.java | 2 +-
.../beam/runners/spark/examples/WordCount.java | 2 +-
.../apache/beam/runners/spark/io/ConsoleIO.java | 2 +-
.../beam/runners/spark/io/CreateStream.java | 2 +-
.../beam/runners/spark/io/hadoop/HadoopIO.java | 4 +-
.../translation/StorageLevelPTransform.java | 2 +-
.../util/SinglePrimitiveOutputPTransform.java | 2 +-
.../java/org/apache/beam/sdk/io/AvroIO.java | 4 +-
.../sdk/io/BoundedReadFromUnboundedSource.java | 2 +-
.../org/apache/beam/sdk/io/CountingInput.java | 4 +-
.../java/org/apache/beam/sdk/io/PubsubIO.java | 4 +-
.../apache/beam/sdk/io/PubsubUnboundedSink.java | 2 +-
.../beam/sdk/io/PubsubUnboundedSource.java | 2 +-
.../main/java/org/apache/beam/sdk/io/Read.java | 4 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 4 +-
.../main/java/org/apache/beam/sdk/io/Write.java | 2 +-
.../apache/beam/sdk/runners/PipelineRunner.java | 2 +-
.../apache/beam/sdk/testing/GatherAllPanes.java | 2 +-
.../org/apache/beam/sdk/testing/PAssert.java | 12 ++---
.../org/apache/beam/sdk/testing/TestStream.java | 2 +-
.../beam/sdk/transforms/ApproximateUnique.java | 4 +-
.../org/apache/beam/sdk/transforms/Combine.java | 10 ++--
.../org/apache/beam/sdk/transforms/Count.java | 2 +-
.../org/apache/beam/sdk/transforms/Create.java | 4 +-
.../apache/beam/sdk/transforms/Distinct.java | 4 +-
.../org/apache/beam/sdk/transforms/Filter.java | 2 +-
.../beam/sdk/transforms/FlatMapElements.java | 2 +-
.../org/apache/beam/sdk/transforms/Flatten.java | 4 +-
.../apache/beam/sdk/transforms/GroupByKey.java | 2 +-
.../org/apache/beam/sdk/transforms/Keys.java | 2 +-
.../org/apache/beam/sdk/transforms/KvSwap.java | 2 +-
.../org/apache/beam/sdk/transforms/Latest.java | 4 +-
.../apache/beam/sdk/transforms/MapElements.java | 2 +-
.../apache/beam/sdk/transforms/PTransform.java | 4 +-
.../org/apache/beam/sdk/transforms/ParDo.java | 4 +-
.../apache/beam/sdk/transforms/Partition.java | 2 +-
.../org/apache/beam/sdk/transforms/Regex.java | 14 +++---
.../org/apache/beam/sdk/transforms/Sample.java | 2 +-
.../org/apache/beam/sdk/transforms/Values.java | 2 +-
.../org/apache/beam/sdk/transforms/View.java | 12 ++---
.../apache/beam/sdk/transforms/WithKeys.java | 2 +-
.../beam/sdk/transforms/WithTimestamps.java | 2 +-
.../beam/sdk/transforms/join/CoGroupByKey.java | 2 +-
.../beam/sdk/transforms/windowing/Window.java | 4 +-
.../org/apache/beam/sdk/util/Reshuffle.java | 2 +-
.../java/org/apache/beam/sdk/PipelineTest.java | 6 +--
.../beam/sdk/coders/CoderRegistryTest.java | 4 +-
.../java/org/apache/beam/sdk/io/WriteTest.java | 2 +-
.../sdk/runners/TransformHierarchyTest.java | 4 +-
.../beam/sdk/runners/TransformTreeTest.java | 4 +-
.../sdk/transforms/FlatMapElementsTest.java | 2 +-
.../beam/sdk/transforms/GroupByKeyTest.java | 2 +-
.../beam/sdk/transforms/MapElementsTest.java | 2 +-
.../beam/sdk/transforms/PTransformTest.java | 2 +-
.../apache/beam/sdk/transforms/ParDoTest.java | 4 +-
.../apache/beam/sdk/transforms/ViewTest.java | 2 +-
.../display/DisplayDataEvaluatorTest.java | 2 +-
.../display/DisplayDataMatchersTest.java | 6 +--
.../sdk/transforms/display/DisplayDataTest.java | 2 +-
.../sdk/transforms/windowing/WindowingTest.java | 2 +-
.../apache/beam/sdk/util/StringUtilsTest.java | 6 +--
.../org/apache/beam/sdk/values/PDoneTest.java | 4 +-
.../beam/sdk/extensions/sorter/SortValues.java | 2 +-
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 8 +--
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 4 +-
.../beam/sdk/io/gcp/datastore/DatastoreV1.java | 4 +-
.../org/apache/beam/sdk/io/jdbc/JdbcIO.java | 4 +-
.../java/org/apache/beam/sdk/io/jms/JmsIO.java | 4 +-
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 10 ++--
.../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 4 +-
.../apache/beam/sdk/io/mongodb/MongoDbIO.java | 4 +-
114 files changed, 238 insertions(+), 237 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-beam git commit: Rename PTransform.apply to
PTransform.expand
Posted by ke...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 9768788..9a6b534 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -700,7 +700,7 @@ public class PubsubIO {
}
@Override
- public PCollection<T> apply(PBegin input) {
+ public PCollection<T> expand(PBegin input) {
if (topic == null && subscription == null) {
throw new IllegalStateException("Need to set either the topic or the subscription for "
+ "a PubsubIO.Read transform");
@@ -1057,7 +1057,7 @@ public class PubsubIO {
}
@Override
- public PDone apply(PCollection<T> input) {
+ public PDone expand(PCollection<T> input) {
if (topic == null) {
throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index 1a86a1c..1992cb8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -424,7 +424,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
}
@Override
- public PDone apply(PCollection<T> input) {
+ public PDone expand(PCollection<T> input) {
input.apply("PubsubUnboundedSink.Window", Window.<T>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index cea74bc..da3b437 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -1339,7 +1339,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
}
@Override
- public PCollection<T> apply(PBegin input) {
+ public PCollection<T> expand(PBegin input) {
return input.getPipeline().begin()
.apply(Read.from(new PubsubSource<T>(this)))
.apply("PubsubUnboundedSource.Stats",
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index f04fbaf..7ec3b0e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -101,7 +101,7 @@ public class Read {
}
@Override
- public final PCollection<T> apply(PBegin input) {
+ public final PCollection<T> expand(PBegin input) {
source.validate();
return PCollection.<T>createPrimitiveOutputInternal(input.getPipeline(),
@@ -169,7 +169,7 @@ public class Read {
}
@Override
- public final PCollection<T> apply(PBegin input) {
+ public final PCollection<T> expand(PBegin input) {
source.validate();
return PCollection.<T>createPrimitiveOutputInternal(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index e967a27..54e73d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -292,7 +292,7 @@ public class TextIO {
}
@Override
- public PCollection<T> apply(PBegin input) {
+ public PCollection<T> expand(PBegin input) {
if (filepattern == null) {
throw new IllegalStateException("need to set the filepattern of a TextIO.Read transform");
}
@@ -742,7 +742,7 @@ public class TextIO {
}
@Override
- public PDone apply(PCollection<T> input) {
+ public PDone expand(PCollection<T> input) {
if (filenamePrefix == null) {
throw new IllegalStateException(
"need to set the filename prefix of a TextIO.Write transform");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index 7559fca..bc651d8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -104,7 +104,7 @@ public class Write {
}
@Override
- public PDone apply(PCollection<T> input) {
+ public PDone expand(PCollection<T> input) {
checkArgument(IsBounded.BOUNDED == input.isBounded(),
"%s can only be applied to a Bounded PCollection",
Write.class.getSimpleName());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
index 77f5128..8604dbc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
@@ -73,6 +73,6 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> {
*/
public <OutputT extends POutput, InputT extends PInput> OutputT apply(
PTransform<InputT, OutputT> transform, InputT input) {
- return transform.apply(input);
+ return transform.expand(input);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
index 2b311b7..bf2cd0b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
@@ -55,7 +55,7 @@ class GatherAllPanes<T>
private GatherAllPanes() {}
@Override
- public PCollection<Iterable<ValueInSingleWindow<T>>> apply(PCollection<T> input) {
+ public PCollection<Iterable<ValueInSingleWindow<T>>> expand(PCollection<T> input) {
WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn();
return input
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index 7dc78d8..b23f4f3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -763,7 +763,7 @@ public class PAssert {
}
@Override
- public PCollectionView<ActualT> apply(PBegin input) {
+ public PCollectionView<ActualT> expand(PBegin input) {
final Coder<T> coder = actual.getCoder();
return actual
.apply("FilterActuals", rewindowActuals.<T>prepareActuals())
@@ -833,7 +833,7 @@ public class PAssert {
}
@Override
- public PCollection<Iterable<ValueInSingleWindow<T>>> apply(PCollection<T> input) {
+ public PCollection<Iterable<ValueInSingleWindow<T>>> expand(PCollection<T> input) {
final int combinedKey = 42;
// Remove the triggering on both
@@ -925,7 +925,7 @@ public class PAssert {
}
@Override
- public PDone apply(PCollection<T> input) {
+ public PDone expand(PCollection<T> input) {
input
.apply("GroupGlobally", new GroupGlobally<T>(rewindowingStrategy))
.apply("GetPane", MapElements.via(paneExtractor))
@@ -958,7 +958,7 @@ public class PAssert {
}
@Override
- public PDone apply(PCollection<Iterable<T>> input) {
+ public PDone expand(PCollection<Iterable<T>> input) {
input
.apply("GroupGlobally", new GroupGlobally<Iterable<T>>(rewindowingStrategy))
.apply("GetPane", MapElements.via(paneExtractor))
@@ -995,7 +995,7 @@ public class PAssert {
}
@Override
- public PDone apply(PBegin input) {
+ public PDone expand(PBegin input) {
final PCollectionView<ActualT> actual = input.apply("CreateActual", createActual);
input
@@ -1321,7 +1321,7 @@ public class PAssert {
}
@Override
- public PCollection<T> apply(PCollection<T> input) {
+ public PCollection<T> expand(PCollection<T> input) {
return input.apply("FilterWindows", ParDo.of(new Fn()));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
index 509bb24..da93cdc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
@@ -252,7 +252,7 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
}
@Override
- public PCollection<T> apply(PBegin input) {
+ public PCollection<T> expand(PBegin input) {
throw new IllegalStateException(
String.format(
"Pipeline Runner %s does not provide a required override for %s",
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
index 71c2158..33820e0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
@@ -204,7 +204,7 @@ public class ApproximateUnique {
}
@Override
- public PCollection<Long> apply(PCollection<T> input) {
+ public PCollection<Long> expand(PCollection<T> input) {
Coder<T> coder = input.getCoder();
return input.apply(
Combine.globally(
@@ -271,7 +271,7 @@ public class ApproximateUnique {
}
@Override
- public PCollection<KV<K, Long>> apply(PCollection<KV<K, V>> input) {
+ public PCollection<KV<K, Long>> expand(PCollection<KV<K, V>> input) {
Coder<KV<K, V>> inputCoder = input.getCoder();
if (!(inputCoder instanceof KvCoder)) {
throw new IllegalStateException(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 4127d94..3b07260 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -1431,7 +1431,7 @@ public class Combine {
}
@Override
- public PCollection<OutputT> apply(PCollection<InputT> input) {
+ public PCollection<OutputT> expand(PCollection<InputT> input) {
PCollection<KV<Void, InputT>> withKeys = input
.apply(WithKeys.<Void, InputT>of((Void) null))
.setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()));
@@ -1569,7 +1569,7 @@ public class Combine {
}
@Override
- public PCollectionView<OutputT> apply(PCollection<InputT> input) {
+ public PCollectionView<OutputT> expand(PCollection<InputT> input) {
Globally<InputT, OutputT> combineGlobally =
Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout);
if (insertDefault) {
@@ -1866,7 +1866,7 @@ public class Combine {
}
@Override
- public PCollection<KV<K, OutputT>> apply(PCollection<KV<K, InputT>> input) {
+ public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) {
return input
.apply(GroupByKey.<K, InputT>create(fewKeys))
.apply(Combine.<K, InputT, OutputT>groupedValues(fn, fnDisplayData)
@@ -1901,7 +1901,7 @@ public class Combine {
}
@Override
- public PCollection<KV<K, OutputT>> apply(PCollection<KV<K, InputT>> input) {
+ public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) {
return applyHelper(input);
}
@@ -2388,7 +2388,7 @@ public class Combine {
}
@Override
- public PCollection<KV<K, OutputT>> apply(
+ public PCollection<KV<K, OutputT>> expand(
PCollection<? extends KV<K, ? extends Iterable<InputT>>> input) {
PCollection<KV<K, OutputT>> output = input.apply(ParDo.of(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index b393a30..9101996 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -103,7 +103,7 @@ public class Count {
public PerElement() { }
@Override
- public PCollection<KV<T, Long>> apply(PCollection<T> input) {
+ public PCollection<KV<T, Long>> expand(PCollection<T> input) {
return
input
.apply("Init", MapElements.via(new SimpleFunction<T, KV<T, Void>>() {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 7cd4711..a48136f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -240,7 +240,7 @@ public class Create<T> {
}
@Override
- public PCollection<T> apply(PBegin input) {
+ public PCollection<T> expand(PBegin input) {
try {
Coder<T> coder = getDefaultOutputCoder(input);
try {
@@ -440,7 +440,7 @@ public class Create<T> {
}
@Override
- public PCollection<T> apply(PBegin input) {
+ public PCollection<T> expand(PBegin input) {
try {
Iterable<T> rawElements =
Iterables.transform(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
index fba428b..2d08cee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
@@ -82,7 +82,7 @@ public class Distinct<T> extends PTransform<PCollection<T>,
}
@Override
- public PCollection<T> apply(PCollection<T> in) {
+ public PCollection<T> expand(PCollection<T> in) {
return in
.apply("CreateIndex", MapElements.via(new SimpleFunction<T, KV<T, Void>>() {
@Override
@@ -121,7 +121,7 @@ public class Distinct<T> extends PTransform<PCollection<T>,
}
@Override
- public PCollection<T> apply(PCollection<T> in) {
+ public PCollection<T> expand(PCollection<T> in) {
WithKeys<IdT, T> withKeys = WithKeys.of(fn);
if (representativeType != null) {
withKeys = withKeys.withKeyType(representativeType);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
index 2d9bdee..a564999 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
@@ -201,7 +201,7 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
}
@Override
- public PCollection<T> apply(PCollection<T> input) {
+ public PCollection<T> expand(PCollection<T> input) {
return input.apply(ParDo.of(new DoFn<T, T>() {
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index 4ef809f..c165f7f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -129,7 +129,7 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
}
@Override
- public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
+ public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
return input.apply(
"FlatMap",
ParDo.of(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index e612836..3ef2e55 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -105,7 +105,7 @@ public class Flatten {
private FlattenPCollectionList() { }
@Override
- public PCollection<T> apply(PCollectionList<T> inputs) {
+ public PCollection<T> expand(PCollectionList<T> inputs) {
WindowingStrategy<?, ?> windowingStrategy;
IsBounded isBounded = IsBounded.BOUNDED;
if (!inputs.getAll().isEmpty()) {
@@ -163,7 +163,7 @@ public class Flatten {
extends PTransform<PCollection<? extends Iterable<T>>, PCollection<T>> {
@Override
- public PCollection<T> apply(PCollection<? extends Iterable<T>> in) {
+ public PCollection<T> expand(PCollection<? extends Iterable<T>> in) {
Coder<? extends Iterable<T>> inCoder = in.getCoder();
if (!(inCoder instanceof IterableLikeCoder)) {
throw new IllegalArgumentException(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index 1faac59..a339af7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -216,7 +216,7 @@ public class GroupByKey<K, V>
}
@Override
- public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+ public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
// This primitive operation groups by the combination of key and window,
// merging windows as needed, using the windows assigned to the
// key/value input elements and the window merge operation of the
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
index 2405adf..c6f307d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
@@ -56,7 +56,7 @@ public class Keys<K> extends PTransform<PCollection<? extends KV<K, ?>>,
private Keys() { }
@Override
- public PCollection<K> apply(PCollection<? extends KV<K, ?>> in) {
+ public PCollection<K> expand(PCollection<? extends KV<K, ?>> in) {
return
in.apply("Keys", MapElements.via(new SimpleFunction<KV<K, ?>, K>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
index 2b81ebf..dbe262b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
@@ -60,7 +60,7 @@ public class KvSwap<K, V> extends PTransform<PCollection<KV<K, V>>,
private KvSwap() { }
@Override
- public PCollection<KV<V, K>> apply(PCollection<KV<K, V>> in) {
+ public PCollection<KV<V, K>> expand(PCollection<KV<K, V>> in) {
return
in.apply("KvSwap", MapElements.via(new SimpleFunction<KV<K, V>, KV<V, K>>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
index 83cceca..9c2d715 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
@@ -159,7 +159,7 @@ public class Latest {
private static class Globally<T> extends PTransform<PCollection<T>, PCollection<T>> {
@Override
- public PCollection<T> apply(PCollection<T> input) {
+ public PCollection<T> expand(PCollection<T> input) {
Coder<T> inputCoder = input.getCoder();
return input
@@ -178,7 +178,7 @@ public class Latest {
private static class PerKey<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
@Override
- public PCollection<KV<K, V>> apply(PCollection<KV<K, V>> input) {
+ public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
checkNotNull(input);
checkArgument(input.getCoder() instanceof KvCoder,
"Input specifiedCoder must be an instance of KvCoder, but was %s", input.getCoder());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index c109034..421b2ab 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -111,7 +111,7 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
}
@Override
- public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
+ public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
return input.apply(
"Map",
ParDo.of(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
index 83fe577..ce4891d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
@@ -129,7 +129,7 @@ import org.apache.beam.sdk.values.TypedPValue;
* output value as their apply implementation.
* The majority of PTransforms are
* implemented as composites of other PTransforms. Such a PTransform
- * subclass typically just implements {@link #apply}, computing its
+ * subclass typically just implements {@link #expand}, computing its
* Output value from its {@code InputT} value. User programs are encouraged to
* use this mechanism to modularize their own code. Such composite
* abstractions get their own name, and navigating through the
@@ -181,7 +181,7 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
* a new unbound output and register evaluators (via backend-specific
* registration methods).
*/
- public abstract OutputT apply(InputT input);
+ public abstract OutputT expand(InputT input);
/**
* Called before invoking apply (which may be intercepted by the runner) to
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index ba6e644..4f7491e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -795,7 +795,7 @@ public class ParDo {
}
@Override
- public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
+ public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
checkArgument(
!isSplittable(getOldFn()), "Splittable DoFn not supported by the current runner");
validateWindowType(input, fn);
@@ -1052,7 +1052,7 @@ public class ParDo {
@Override
- public PCollectionTuple apply(PCollection<? extends InputT> input) {
+ public PCollectionTuple expand(PCollection<? extends InputT> input) {
checkArgument(
!isSplittable(getOldFn()), "Splittable DoFn not supported by the current runner");
validateWindowType(input, fn);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
index b525872..e0b2b61 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
@@ -100,7 +100,7 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>>
/////////////////////////////////////////////////////////////////////////////
@Override
- public PCollectionList<T> apply(PCollection<T> in) {
+ public PCollectionList<T> expand(PCollection<T> in) {
final TupleTagList outputTags = partitionDoFn.getOutputTags();
PCollectionTuple outputs = in.apply(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java
index a94130d..14c5d1b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java
@@ -179,7 +179,7 @@ public class Regex {
this.group = group;
}
- public PCollection<String> apply(PCollection<String> in) {
+ public PCollection<String> expand(PCollection<String> in) {
return in.apply(
ParDo.of(
new DoFn<String, String>() {
@@ -224,7 +224,7 @@ public class Regex {
this.valueGroup = valueGroup;
}
- public PCollection<KV<String, String>> apply(PCollection<String> in) {
+ public PCollection<KV<String, String>> expand(PCollection<String> in) {
return in.apply(
ParDo.of(
new DoFn<String, KV<String, String>>() {
@@ -266,7 +266,7 @@ public class Regex {
this.group = group;
}
- public PCollection<String> apply(PCollection<String> in) {
+ public PCollection<String> expand(PCollection<String> in) {
return in.apply(
ParDo.of(
new DoFn<String, String>() {
@@ -312,7 +312,7 @@ public class Regex {
this.valueGroup = valueGroup;
}
- public PCollection<KV<String, String>> apply(PCollection<String> in) {
+ public PCollection<KV<String, String>> expand(PCollection<String> in) {
return in.apply(
ParDo.of(
new DoFn<String, KV<String, String>>() {
@@ -354,7 +354,7 @@ public class Regex {
this.replacement = replacement;
}
- public PCollection<String> apply(PCollection<String> in) {
+ public PCollection<String> expand(PCollection<String> in) {
return in.apply(
ParDo.of(
new DoFn<String, String>() {
@@ -393,7 +393,7 @@ public class Regex {
this.replacement = replacement;
}
- public PCollection<String> apply(PCollection<String> in) {
+ public PCollection<String> expand(PCollection<String> in) {
return in.apply(
ParDo.of(
new DoFn<String, String>() {
@@ -434,7 +434,7 @@ public class Regex {
this.outputEmpty = outputEmpty;
}
- public PCollection<String> apply(PCollection<String> in) {
+ public PCollection<String> expand(PCollection<String> in) {
return in.apply(
ParDo.of(
new DoFn<String, String>() {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
index eca987a..7d4e630 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
@@ -143,7 +143,7 @@ public class Sample {
}
@Override
- public PCollection<T> apply(PCollection<T> in) {
+ public PCollection<T> expand(PCollection<T> in) {
PCollectionView<Iterable<T>> iterableView = in.apply(View.<T>asIterable());
return
in.getPipeline()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
index d21d100..faf402c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
@@ -56,7 +56,7 @@ public class Values<V> extends PTransform<PCollection<? extends KV<?, V>>,
private Values() { }
@Override
- public PCollection<V> apply(PCollection<? extends KV<?, V>> in) {
+ public PCollection<V> expand(PCollection<? extends KV<?, V>> in) {
return
in.apply("Values", MapElements.via(new SimpleFunction<KV<?, V>, V>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 5fafc0a..126679d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -251,7 +251,7 @@ public class View {
}
@Override
- public PCollectionView<List<T>> apply(PCollection<T> input) {
+ public PCollectionView<List<T>> expand(PCollection<T> input) {
return input.apply(CreatePCollectionView.<T, List<T>>of(PCollectionViews.listView(
input.getPipeline(), input.getWindowingStrategy(), input.getCoder())));
}
@@ -277,7 +277,7 @@ public class View {
}
@Override
- public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
+ public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
return input.apply(CreatePCollectionView.<T, Iterable<T>>of(PCollectionViews.iterableView(
input.getPipeline(), input.getWindowingStrategy(), input.getCoder())));
}
@@ -334,7 +334,7 @@ public class View {
}
@Override
- public PCollectionView<T> apply(PCollection<T> input) {
+ public PCollectionView<T> expand(PCollection<T> input) {
return input.apply(CreatePCollectionView.<T, T>of(PCollectionViews.singletonView(
input.getPipeline(),
input.getWindowingStrategy(),
@@ -364,7 +364,7 @@ public class View {
}
@Override
- public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+ public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(
PCollectionViews.multimapView(
input.getPipeline(),
@@ -401,7 +401,7 @@ public class View {
}
@Override
- public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) {
+ public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, V>>of(
PCollectionViews.mapView(
input.getPipeline(),
@@ -439,7 +439,7 @@ public class View {
}
@Override
- public PCollectionView<ViewT> apply(PCollection<ElemT> input) {
+ public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
return view;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
index de28ecb..dd38006 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
@@ -111,7 +111,7 @@ public class WithKeys<K, V> extends PTransform<PCollection<V>,
}
@Override
- public PCollection<KV<K, V>> apply(PCollection<V> in) {
+ public PCollection<KV<K, V>> expand(PCollection<V> in) {
PCollection<KV<K, V>> result =
in.apply("AddKeys", MapElements.via(new SimpleFunction<V, KV<K, V>>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
index 64e7c45..387707b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
@@ -98,7 +98,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
}
@Override
- public PCollection<T> apply(PCollection<T> input) {
+ public PCollection<T> expand(PCollection<T> input) {
return input.apply(
"AddTimestamps", ParDo.of(new AddTimestampsDoFn<T>(fn, allowedTimestampSkew)));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
index b760e2c..1d10da4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
@@ -87,7 +87,7 @@ public class CoGroupByKey<K> extends
private CoGroupByKey() { }
@Override
- public PCollection<KV<K, CoGbkResult>> apply(
+ public PCollection<KV<K, CoGbkResult>> expand(
KeyedPCollectionTuple<K> input) {
if (input.isEmpty()) {
throw new IllegalArgumentException(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 5607762..0c430d0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -445,7 +445,7 @@ public class Window {
}
@Override
- public PCollection<T> apply(PCollection<T> input) {
+ public PCollection<T> expand(PCollection<T> input) {
WindowingStrategy<?, ?> outputStrategy =
getOutputStrategyInternal(input.getWindowingStrategy());
return PCollection.createPrimitiveOutputInternal(
@@ -517,7 +517,7 @@ public class Window {
*/
private static class Remerge<T> extends PTransform<PCollection<T>, PCollection<T>> {
@Override
- public PCollection<T> apply(PCollection<T> input) {
+ public PCollection<T> expand(PCollection<T> input) {
WindowingStrategy<?, ?> outputWindowingStrategy = getOutputWindowing(
input.getWindowingStrategy());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index ebd44bf..4d86c74 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -50,7 +50,7 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti
}
@Override
- public PCollection<KV<K, V>> apply(PCollection<KV<K, V>> input) {
+ public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
WindowingStrategy<?, ?> originalStrategy = input.getWindowingStrategy();
// If the input has already had its windows merged, then the GBK that performed the merge
// will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index 0c743f7..fea1554 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -212,7 +212,7 @@ public class PipelineTest {
private static class IdentityTransform<T extends PInput & POutput>
extends PTransform<T, T> {
@Override
- public T apply(T input) {
+ public T expand(T input) {
return input;
}
}
@@ -247,7 +247,7 @@ public class PipelineTest {
}
@Override
- public PCollection<T> apply(PCollectionTuple input) {
+ public PCollection<T> expand(PCollectionTuple input) {
return input.get(tag);
}
}
@@ -281,7 +281,7 @@ public class PipelineTest {
}
@Override
- public PCollectionTuple apply(PCollection<T> input) {
+ public PCollectionTuple expand(PCollection<T> input) {
return PCollectionTuple.of(tag, input);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index 530d755..d7badab 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -370,7 +370,7 @@ public class CoderRegistryTest {
@Override
public PCollection<KV<String, MySerializableGeneric<String>>>
- apply(PCollection<String> input) {
+ expand(PCollection<String> input) {
return input.apply(ParDo.of(new OutputDoFn()));
}
}
@@ -435,7 +435,7 @@ public class CoderRegistryTest {
@Override
public PCollection<KV<String, MySerializableGeneric<T>>>
- apply(PCollection<String> input) {
+ expand(PCollection<String> input) {
return input.apply(ParDo.of(new OutputDoFn()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 5be5ff1..5a7c994 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -123,7 +123,7 @@ public class WriteTest {
}
@Override
- public PCollection<T> apply(PCollection<T> input) {
+ public PCollection<T> expand(PCollection<T> input) {
return input
.apply(window)
.apply(ParDo.of(new AddArbitraryKey<T>()))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index ea43188..b0c17d8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -98,7 +98,7 @@ public class TransformHierarchyTest {
pcList,
new PTransform<PCollectionList<Long>, PCollection<Long>>() {
@Override
- public PCollection<Long> apply(PCollectionList<Long> input) {
+ public PCollection<Long> expand(PCollectionList<Long> input) {
return input.get(0);
}
});
@@ -130,7 +130,7 @@ public class TransformHierarchyTest {
pcList,
new PTransform<PCollectionList<Long>, PCollectionList<Long>>() {
@Override
- public PCollectionList<Long> apply(PCollectionList<Long> input) {
+ public PCollectionList<Long> expand(PCollectionList<Long> input) {
return appended;
}
});
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index a81fb1a..d70aa2f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -73,7 +73,7 @@ public class TransformTreeTest {
extends PTransform<PBegin, PCollectionList<String>> {
@Override
- public PCollectionList<String> apply(PBegin b) {
+ public PCollectionList<String> expand(PBegin b) {
// Composite transform: apply delegates to other transformations,
// here a Create transform.
PCollection<String> result = b.apply(Create.of("hello", "world"));
@@ -95,7 +95,7 @@ public class TransformTreeTest {
extends PTransform<PCollection<Integer>, PDone> {
@Override
- public PDone apply(PCollection<Integer> input) {
+ public PDone expand(PCollection<Integer> input) {
// Apply an operation so that this is a composite transform.
input.apply(Count.<Integer>perElement());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
index 311c8de..bb2877e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
@@ -181,7 +181,7 @@ public class FlatMapElementsTest implements Serializable {
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Void>>> {
@Override
- public PCollection<KV<K, Void>> apply(PCollection<KV<K, V>> input) {
+ public PCollection<KV<K, Void>> expand(PCollection<KV<K, V>> input) {
return input.apply(FlatMapElements.<KV<K, V>, KV<K, Void>>via(
new SimpleFunction<KV<K, V>, Iterable<KV<K, Void>>>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 535ffec..ebde110 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -306,7 +306,7 @@ public class GroupByKeyTest {
p.apply(
new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
@Override
- public PCollection<KV<String, Integer>> apply(PBegin input) {
+ public PCollection<KV<String, Integer>> expand(PBegin input) {
return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
input.getPipeline(),
WindowingStrategy.globalDefault(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index 4a34c57..ac3444b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -271,7 +271,7 @@ public class MapElementsTest implements Serializable {
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Void>>> {
@Override
- public PCollection<KV<K, Void>> apply(PCollection<KV<K, V>> input) {
+ public PCollection<KV<K, Void>> expand(PCollection<KV<K, V>> input) {
return input.apply(MapElements.via(
new SimpleFunction<KV<K, V>, KV<K, Void>>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PTransformTest.java
index bfe8225..2b7caee 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PTransformTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PTransformTest.java
@@ -36,7 +36,7 @@ public class PTransformTest {
PTransform<PCollection<String>, PCollection<String>> transform =
new PTransform<PCollection<String>, PCollection<String>>() {
@Override
- public PCollection<String> apply(PCollection<String> begin) {
+ public PCollection<String> expand(PCollection<String> begin) {
throw new IllegalArgumentException("Should never be applied");
}
};
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 9755076..41e795e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -272,7 +272,7 @@ public class ParDoTest implements Serializable {
private static final TupleTag<Integer> BY3 = new TupleTag<Integer>("by3"){};
@Override
- public PCollectionTuple apply(PCollection<Integer> input) {
+ public PCollectionTuple expand(PCollection<Integer> input) {
PCollection<Integer> by2 = input.apply("Filter2s", ParDo.of(new FilterFn(2)));
PCollection<Integer> by3 = input.apply("Filter3s", ParDo.of(new FilterFn(3)));
return PCollectionTuple.of(BY2, by2).and(BY3, by3);
@@ -840,7 +840,7 @@ public class ParDoTest implements Serializable {
.apply(Create.of(inputs))
.apply("CustomTransform", new PTransform<PCollection<Integer>, PCollection<String>>() {
@Override
- public PCollection<String> apply(PCollection<Integer> input) {
+ public PCollection<String> expand(PCollection<Integer> input) {
return input.apply(ParDo.of(new TestDoFn()));
}
});
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index fe2d125..3bf63fd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -1360,7 +1360,7 @@ public class ViewTest implements Serializable {
.apply(
new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
@Override
- public PCollection<KV<String, Integer>> apply(PBegin input) {
+ public PCollection<KV<String, Integer>> expand(PBegin input) {
return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
input.getPipeline(),
WindowingStrategy.globalDefault(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
index 7630779..feff333 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
@@ -46,7 +46,7 @@ public class DisplayDataEvaluatorTest implements Serializable {
PTransform<? super PCollection<String>, ? super POutput> myTransform =
new PTransform<PCollection<String>, POutput> () {
@Override
- public PCollection<String> apply(PCollection<String> input) {
+ public PCollection<String> expand(PCollection<String> input) {
return input.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
index f7f8d40..f892153 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
@@ -78,7 +78,7 @@ public class DisplayDataMatchersTest {
DisplayData data = DisplayData.from(new PTransform<PCollection<String>, PCollection<String>>() {
@Override
- public PCollection<String> apply(PCollection<String> input) {
+ public PCollection<String> expand(PCollection<String> input) {
throw new IllegalArgumentException("Should never be applied");
}
@@ -133,7 +133,7 @@ public class DisplayDataMatchersTest {
assertFalse(matcher.matches(DisplayData.from(
new PTransform<PCollection<String>, PCollection<String>>(){
@Override
- public PCollection<String> apply(PCollection<String> input) {
+ public PCollection<String> expand(PCollection<String> input) {
throw new IllegalArgumentException("Should never be applied");
}
})));
@@ -206,7 +206,7 @@ public class DisplayDataMatchersTest {
}
@Override
- public PCollection<String> apply(PCollection<String> input) {
+ public PCollection<String> expand(PCollection<String> input) {
throw new IllegalArgumentException("Should never be applied");
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
index fccd031..f5c1e73 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
@@ -111,7 +111,7 @@ public class DisplayDataTest implements Serializable {
Instant startTime = defaultStartTime;
@Override
- public PCollection<String> apply(PCollection<String> begin) {
+ public PCollection<String> expand(PCollection<String> begin) {
throw new IllegalArgumentException("Should never be applied");
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index ab208dd..d4fab17 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -70,7 +70,7 @@ public class WindowingTest implements Serializable {
this.windowFn = windowFn;
}
@Override
- public PCollection<String> apply(PCollection<String> in) {
+ public PCollection<String> expand(PCollection<String> in) {
return in.apply("Window",
Window.<String>into(windowFn)
.withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
index 042e9e3..1ac176b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
@@ -69,13 +69,13 @@ public class StringUtilsTest {
private class EmbeddedPTransform extends PTransform<PBegin, PDone> {
@Override
- public PDone apply(PBegin begin) {
+ public PDone expand(PBegin begin) {
throw new IllegalArgumentException("Should never be applied");
}
private class Bound extends PTransform<PBegin, PDone> {
@Override
- public PDone apply(PBegin begin) {
+ public PDone expand(PBegin begin) {
throw new IllegalArgumentException("Should never be applied");
}
}
@@ -129,7 +129,7 @@ public class StringUtilsTest {
AnonymousClass anonymousClassObj = new AnonymousClass() {
class NamedInnerClass extends PTransform<PBegin, PDone> {
@Override
- public PDone apply(PBegin begin) {
+ public PDone expand(PBegin begin) {
throw new IllegalArgumentException("Should never be applied");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
index 4000e5d..e5f2019 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
@@ -48,7 +48,7 @@ public class PDoneTest {
*/
static class EmptyTransform extends PTransform<PBegin, PDone> {
@Override
- public PDone apply(PBegin begin) {
+ public PDone expand(PBegin begin) {
return PDone.in(begin.getPipeline());
}
}
@@ -64,7 +64,7 @@ public class PDoneTest {
}
@Override
- public PDone apply(PBegin begin) {
+ public PDone expand(PBegin begin) {
return
begin
.apply(Create.of(LINES))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
index 3bd9afa..d1b4d07 100644
--- a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
@@ -74,7 +74,7 @@ public class SortValues<PrimaryKeyT, SecondaryKeyT, ValueT>
}
@Override
- public PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> apply(
+ public PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> expand(
PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> input) {
return input.apply(
ParDo.of(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 8bfbd53..f99ca78 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -631,7 +631,7 @@ public class BigQueryIO {
}
@Override
- public PCollection<TableRow> apply(PBegin input) {
+ public PCollection<TableRow> expand(PBegin input) {
String uuid = randomUUIDString();
final String jobIdToken = "beam_job_" + uuid;
@@ -840,7 +840,7 @@ public class BigQueryIO {
}
@Override
- public PCollection<T> apply(PCollection<T> input) {
+ public PCollection<T> expand(PCollection<T> input) {
TupleTag<T> mainOutput = new TupleTag<>();
TupleTag<Void> cleanupSignal = new TupleTag<>();
PCollectionTuple outputs = input.apply(ParDo.of(new IdentityFn<T>())
@@ -1846,7 +1846,7 @@ public class BigQueryIO {
}
@Override
- public PDone apply(PCollection<TableRow> input) {
+ public PDone expand(PCollection<TableRow> input) {
Pipeline p = input.getPipeline();
BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
BigQueryServices bqServices = getBigQueryServices();
@@ -2803,7 +2803,7 @@ public class BigQueryIO {
}
@Override
- public PDone apply(PCollection<TableRow> input) {
+ public PDone expand(PCollection<TableRow> input) {
// A naive implementation would be to simply stream data directly to BigQuery.
// However, this could occasionally lead to duplicated data, e.g., when
// a VM that runs this code is restarted and the code is re-run.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 1ee9253..a83784b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -286,7 +286,7 @@ public class BigtableIO {
}
@Override
- public PCollection<Row> apply(PBegin input) {
+ public PCollection<Row> expand(PBegin input) {
BigtableSource source =
new BigtableSource(new SerializableFunction<PipelineOptions, BigtableService>() {
@Override
@@ -500,7 +500,7 @@ public class BigtableIO {
}
@Override
- public PDone apply(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
+ public PDone expand(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
input.apply(ParDo.of(new BigtableWriterFn(tableId,
new SerializableFunction<PipelineOptions, BigtableService>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index d1a9a67..1e8271c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -411,7 +411,7 @@ public class DatastoreV1 {
}
@Override
- public PCollection<Entity> apply(PBegin input) {
+ public PCollection<Entity> expand(PBegin input) {
V1Options v1Options = V1Options.from(getProjectId(), getQuery(),
getNamespace());
@@ -779,7 +779,7 @@ public class DatastoreV1 {
}
@Override
- public PDone apply(PCollection<T> input) {
+ public PDone expand(PCollection<T> input) {
input.apply("Convert to Mutation", MapElements.via(mutationFn))
.apply("Write Mutation to Datastore", ParDo.of(new DatastoreWriterFn(projectId)));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 9644a65..c625287 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -296,7 +296,7 @@ public class JdbcIO {
}
@Override
- public PCollection<T> apply(PBegin input) {
+ public PCollection<T> expand(PBegin input) {
return input
.apply(Create.of(getQuery()))
.apply(ParDo.of(new ReadFn<>(this))).setCoder(getCoder())
@@ -417,7 +417,7 @@ public class JdbcIO {
}
@Override
- public PDone apply(PCollection<T> input) {
+ public PDone expand(PCollection<T> input) {
input.apply(ParDo.of(new WriteFn<T>(this)));
return PDone.in(input.getPipeline());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index 00b91ad..24fa67d 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -135,7 +135,7 @@ public class JmsIO {
}
@Override
- public PCollection<JmsRecord> apply(PBegin input) {
+ public PCollection<JmsRecord> expand(PBegin input) {
// handles unbounded source to bounded conversion if maxNumRecords is set.
Unbounded<JmsRecord> unbounded = org.apache.beam.sdk.io.Read.from(createSource());
@@ -434,7 +434,7 @@ public class JmsIO {
}
@Override
- public PDone apply(PCollection<String> input) {
+ public PDone expand(PCollection<String> input) {
input.apply(ParDo.of(new JmsWriter(connectionFactory, queue, topic)));
return PDone.in(input.getPipeline());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index c87d12b..735b8e7 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -445,7 +445,7 @@ public class KafkaIO {
}
@Override
- public PCollection<KafkaRecord<K, V>> apply(PBegin input) {
+ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
// Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
Unbounded<KafkaRecord<K, V>> unbounded =
org.apache.beam.sdk.io.Read.from(makeSource());
@@ -544,9 +544,9 @@ public class KafkaIO {
}
@Override
- public PCollection<KV<K, V>> apply(PBegin begin) {
+ public PCollection<KV<K, V>> expand(PBegin begin) {
return typedRead
- .apply(begin)
+ .expand(begin)
.apply("Remove Kafka Metadata",
ParDo.of(new DoFn<KafkaRecord<K, V>, KV<K, V>>() {
@ProcessElement
@@ -1244,7 +1244,7 @@ public class KafkaIO {
}
@Override
- public PDone apply(PCollection<KV<K, V>> input) {
+ public PDone expand(PCollection<KV<K, V>> input) {
input.apply(ParDo.of(new KafkaWriter<K, V>(
topic, keyCoder, valueCoder, producerConfig, producerFactoryFnOpt)));
return PDone.in(input.getPipeline());
@@ -1311,7 +1311,7 @@ public class KafkaIO {
}
@Override
- public PDone apply(PCollection<V> input) {
+ public PDone expand(PCollection<V> input) {
return input
.apply("Kafka values with default key",
MapElements.via(new SimpleFunction<V, KV<Void, V>>() {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
index a482df4..b65e671 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
@@ -308,7 +308,7 @@ public class MongoDbGridFSIO {
}
@Override
- public PCollection<T> apply(PBegin input) {
+ public PCollection<T> expand(PBegin input) {
final BoundedGridFSSource source = new BoundedGridFSSource(this, null);
org.apache.beam.sdk.io.Read.Bounded<ObjectId> objectIds =
org.apache.beam.sdk.io.Read.from(source);
@@ -621,7 +621,7 @@ public class MongoDbGridFSIO {
}
@Override
- public PDone apply(PCollection<T> input) {
+ public PDone expand(PCollection<T> input) {
input.apply(ParDo.of(new GridFsWriteFn<T>(this)));
return PDone.in(input.getPipeline());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 70239e6..f539431 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -160,7 +160,7 @@ public class MongoDbIO {
}
@Override
- public PCollection<Document> apply(PBegin input) {
+ public PCollection<Document> expand(PBegin input) {
return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedMongoDbSource(this)));
}
@@ -441,7 +441,7 @@ public class MongoDbIO {
}
@Override
- public PDone apply(PCollection<Document> input) {
+ public PDone expand(PCollection<Document> input) {
input.apply(ParDo.of(new WriteFn(this)));
return PDone.in(input.getPipeline());
}
[3/3] incubator-beam git commit: Rename PTransform.apply to
PTransform.expand
Posted by ke...@apache.org.
Rename PTransform.apply to PTransform.expand
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4d607b5a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4d607b5a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4d607b5a
Branch: refs/heads/master
Commit: 4d607b5a594bbf2be76626200d989a9e65ba3da9
Parents: 04a41ee
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 7 13:33:04 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:35:36 2016 -0800
----------------------------------------------------------------------
.../org/apache/beam/examples/WordCount.java | 2 +-
.../beam/examples/complete/AutoComplete.java | 6 +--
.../apache/beam/examples/complete/TfIdf.java | 6 +--
.../examples/complete/TopWikipediaSessions.java | 6 +--
.../examples/complete/TrafficMaxLaneFlow.java | 4 +-
.../beam/examples/complete/TrafficRoutes.java | 4 +-
.../examples/cookbook/BigQueryTornadoes.java | 2 +-
.../cookbook/CombinePerKeyExamples.java | 2 +-
.../beam/examples/cookbook/FilterExamples.java | 2 +-
.../examples/cookbook/MaxPerKeyExamples.java | 2 +-
.../beam/examples/cookbook/TriggerExample.java | 4 +-
.../examples/complete/AutoCompleteTest.java | 2 +-
.../beam/examples/complete/game/GameStats.java | 2 +-
.../examples/complete/game/LeaderBoard.java | 4 +-
.../beam/examples/complete/game/UserScore.java | 2 +-
.../complete/game/utils/WriteToBigQuery.java | 2 +-
.../game/utils/WriteWindowedToBigQuery.java | 2 +-
.../apache/beam/runners/apex/ApexRunner.java | 10 ++--
.../apache/beam/runners/core/AssignWindows.java | 2 +-
.../core/GroupByKeyViaGroupByKeyOnly.java | 8 +--
.../beam/runners/core/SplittableParDo.java | 6 +--
.../core/UnboundedReadFromBoundedSource.java | 2 +-
.../beam/runners/direct/DirectGroupByKey.java | 6 +--
.../runners/direct/ForwardingPTransform.java | 6 +--
.../direct/ParDoMultiOverrideFactory.java | 4 +-
.../ParDoSingleViaMultiOverrideFactory.java | 2 +-
.../direct/TestStreamEvaluatorFactory.java | 2 +-
.../runners/direct/ViewEvaluatorFactory.java | 4 +-
.../direct/WriteWithShardingFactory.java | 4 +-
.../runners/direct/CommittedResultTest.java | 2 +-
.../runners/direct/DirectGraphVisitorTest.java | 2 +-
.../direct/ForwardingPTransformTest.java | 4 +-
.../direct/KeyedPValueTrackingVisitorTest.java | 4 +-
.../beam/runners/flink/examples/TFIDF.java | 6 +--
.../beam/runners/flink/examples/WordCount.java | 2 +-
.../flink/examples/streaming/AutoComplete.java | 6 +--
.../apache/beam/runners/flink/FlinkRunner.java | 14 +++---
.../beam/runners/dataflow/DataflowRunner.java | 51 ++++++++++----------
.../dataflow/internal/AssignWindows.java | 2 +-
.../DataflowUnboundedReadFromBoundedSource.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 8 +--
.../runners/dataflow/DataflowRunnerTest.java | 2 +-
.../transforms/DataflowGroupByKeyTest.java | 2 +-
.../dataflow/transforms/DataflowViewTest.java | 2 +-
.../beam/runners/spark/examples/WordCount.java | 2 +-
.../apache/beam/runners/spark/io/ConsoleIO.java | 2 +-
.../beam/runners/spark/io/CreateStream.java | 2 +-
.../beam/runners/spark/io/hadoop/HadoopIO.java | 4 +-
.../translation/StorageLevelPTransform.java | 2 +-
.../util/SinglePrimitiveOutputPTransform.java | 2 +-
.../java/org/apache/beam/sdk/io/AvroIO.java | 4 +-
.../sdk/io/BoundedReadFromUnboundedSource.java | 2 +-
.../org/apache/beam/sdk/io/CountingInput.java | 4 +-
.../java/org/apache/beam/sdk/io/PubsubIO.java | 4 +-
.../apache/beam/sdk/io/PubsubUnboundedSink.java | 2 +-
.../beam/sdk/io/PubsubUnboundedSource.java | 2 +-
.../main/java/org/apache/beam/sdk/io/Read.java | 4 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 4 +-
.../main/java/org/apache/beam/sdk/io/Write.java | 2 +-
.../apache/beam/sdk/runners/PipelineRunner.java | 2 +-
.../apache/beam/sdk/testing/GatherAllPanes.java | 2 +-
.../org/apache/beam/sdk/testing/PAssert.java | 12 ++---
.../org/apache/beam/sdk/testing/TestStream.java | 2 +-
.../beam/sdk/transforms/ApproximateUnique.java | 4 +-
.../org/apache/beam/sdk/transforms/Combine.java | 10 ++--
.../org/apache/beam/sdk/transforms/Count.java | 2 +-
.../org/apache/beam/sdk/transforms/Create.java | 4 +-
.../apache/beam/sdk/transforms/Distinct.java | 4 +-
.../org/apache/beam/sdk/transforms/Filter.java | 2 +-
.../beam/sdk/transforms/FlatMapElements.java | 2 +-
.../org/apache/beam/sdk/transforms/Flatten.java | 4 +-
.../apache/beam/sdk/transforms/GroupByKey.java | 2 +-
.../org/apache/beam/sdk/transforms/Keys.java | 2 +-
.../org/apache/beam/sdk/transforms/KvSwap.java | 2 +-
.../org/apache/beam/sdk/transforms/Latest.java | 4 +-
.../apache/beam/sdk/transforms/MapElements.java | 2 +-
.../apache/beam/sdk/transforms/PTransform.java | 4 +-
.../org/apache/beam/sdk/transforms/ParDo.java | 4 +-
.../apache/beam/sdk/transforms/Partition.java | 2 +-
.../org/apache/beam/sdk/transforms/Regex.java | 14 +++---
.../org/apache/beam/sdk/transforms/Sample.java | 2 +-
.../org/apache/beam/sdk/transforms/Values.java | 2 +-
.../org/apache/beam/sdk/transforms/View.java | 12 ++---
.../apache/beam/sdk/transforms/WithKeys.java | 2 +-
.../beam/sdk/transforms/WithTimestamps.java | 2 +-
.../beam/sdk/transforms/join/CoGroupByKey.java | 2 +-
.../beam/sdk/transforms/windowing/Window.java | 4 +-
.../org/apache/beam/sdk/util/Reshuffle.java | 2 +-
.../java/org/apache/beam/sdk/PipelineTest.java | 6 +--
.../beam/sdk/coders/CoderRegistryTest.java | 4 +-
.../java/org/apache/beam/sdk/io/WriteTest.java | 2 +-
.../sdk/runners/TransformHierarchyTest.java | 4 +-
.../beam/sdk/runners/TransformTreeTest.java | 4 +-
.../sdk/transforms/FlatMapElementsTest.java | 2 +-
.../beam/sdk/transforms/GroupByKeyTest.java | 2 +-
.../beam/sdk/transforms/MapElementsTest.java | 2 +-
.../beam/sdk/transforms/PTransformTest.java | 2 +-
.../apache/beam/sdk/transforms/ParDoTest.java | 4 +-
.../apache/beam/sdk/transforms/ViewTest.java | 2 +-
.../display/DisplayDataEvaluatorTest.java | 2 +-
.../display/DisplayDataMatchersTest.java | 6 +--
.../sdk/transforms/display/DisplayDataTest.java | 2 +-
.../sdk/transforms/windowing/WindowingTest.java | 2 +-
.../apache/beam/sdk/util/StringUtilsTest.java | 6 +--
.../org/apache/beam/sdk/values/PDoneTest.java | 4 +-
.../beam/sdk/extensions/sorter/SortValues.java | 2 +-
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 8 +--
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 4 +-
.../beam/sdk/io/gcp/datastore/DatastoreV1.java | 4 +-
.../org/apache/beam/sdk/io/jdbc/JdbcIO.java | 4 +-
.../java/org/apache/beam/sdk/io/jms/JmsIO.java | 4 +-
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 10 ++--
.../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 4 +-
.../apache/beam/sdk/io/mongodb/MongoDbIO.java | 4 +-
114 files changed, 238 insertions(+), 237 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index 5be0ddc..d4da542 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -126,7 +126,7 @@ public class WordCount {
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
@Override
- public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
+ public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index c556e3f..31b06c9 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -113,7 +113,7 @@ public class AutoComplete {
}
@Override
- public PCollection<KV<String, List<CompletionCandidate>>> apply(PCollection<String> input) {
+ public PCollection<KV<String, List<CompletionCandidate>>> expand(PCollection<String> input) {
PCollection<CompletionCandidate> candidates = input
// First count how often each token appears.
.apply(new Count.PerElement<String>())
@@ -154,7 +154,7 @@ public class AutoComplete {
}
@Override
- public PCollection<KV<String, List<CompletionCandidate>>> apply(
+ public PCollection<KV<String, List<CompletionCandidate>>> expand(
PCollection<CompletionCandidate> input) {
return input
// For each completion candidate, map it to all prefixes.
@@ -209,7 +209,7 @@ public class AutoComplete {
}
@Override
- public PCollectionList<KV<String, List<CompletionCandidate>>> apply(
+ public PCollectionList<KV<String, List<CompletionCandidate>>> expand(
PCollection<CompletionCandidate> input) {
if (minPrefix > 10) {
// Base case, partitioning to return the output in the expected format.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index edf48e7..ea015ae 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -159,7 +159,7 @@ public class TfIdf {
}
@Override
- public PCollection<KV<URI, String>> apply(PBegin input) {
+ public PCollection<KV<URI, String>> expand(PBegin input) {
Pipeline pipeline = input.getPipeline();
// Create one TextIO.Read transform for each document
@@ -200,7 +200,7 @@ public class TfIdf {
public ComputeTfIdf() { }
@Override
- public PCollection<KV<String, KV<URI, Double>>> apply(
+ public PCollection<KV<String, KV<URI, Double>>> expand(
PCollection<KV<URI, String>> uriToContent) {
// Compute the total number of documents, and
@@ -390,7 +390,7 @@ public class TfIdf {
}
@Override
- public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
+ public PDone expand(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
return wordToUriAndTfIdf
.apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() {
@ProcessElement
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
index d57cc3a..df7f81e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -99,7 +99,7 @@ public class TopWikipediaSessions {
static class ComputeSessions
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
- public PCollection<KV<String, Long>> apply(PCollection<String> actions) {
+ public PCollection<KV<String, Long>> expand(PCollection<String> actions) {
return actions
.apply(Window.<String>into(Sessions.withGapDuration(Duration.standardHours(1))))
@@ -113,7 +113,7 @@ public class TopWikipediaSessions {
private static class TopPerMonth
extends PTransform<PCollection<KV<String, Long>>, PCollection<List<KV<String, Long>>>> {
@Override
- public PCollection<List<KV<String, Long>>> apply(PCollection<KV<String, Long>> sessions) {
+ public PCollection<List<KV<String, Long>>> expand(PCollection<KV<String, Long>> sessions) {
return sessions
.apply(Window.<KV<String, Long>>into(CalendarWindows.months(1)))
@@ -154,7 +154,7 @@ public class TopWikipediaSessions {
}
@Override
- public PCollection<String> apply(PCollection<TableRow> input) {
+ public PCollection<String> expand(PCollection<TableRow> input) {
return input
.apply(ParDo.of(new ExtractUserAndTimestamp()))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
index 0c367d4..c1032b9 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
@@ -267,7 +267,7 @@ public class TrafficMaxLaneFlow {
static class MaxLaneFlow
extends PTransform<PCollection<KV<String, LaneInfo>>, PCollection<TableRow>> {
@Override
- public PCollection<TableRow> apply(PCollection<KV<String, LaneInfo>> flowInfo) {
+ public PCollection<TableRow> expand(PCollection<KV<String, LaneInfo>> flowInfo) {
// stationId, LaneInfo => stationId + max lane flow info
PCollection<KV<String, LaneInfo>> flowMaxes =
flowInfo.apply(Combine.<String, LaneInfo>perKey(
@@ -289,7 +289,7 @@ public class TrafficMaxLaneFlow {
}
@Override
- public PCollection<String> apply(PBegin begin) {
+ public PCollection<String> expand(PBegin begin) {
return begin
.apply(TextIO.Read.from(inputFile))
.apply(ParDo.of(new ExtractTimestamps()));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
index 14cee4d..9b5d577 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
@@ -274,7 +274,7 @@ public class TrafficRoutes {
static class TrackSpeed extends
PTransform<PCollection<KV<String, StationSpeed>>, PCollection<TableRow>> {
@Override
- public PCollection<TableRow> apply(PCollection<KV<String, StationSpeed>> stationSpeed) {
+ public PCollection<TableRow> expand(PCollection<KV<String, StationSpeed>> stationSpeed) {
// Apply a GroupByKey transform to collect a list of all station
// readings for a given route.
PCollection<KV<String, Iterable<StationSpeed>>> timeGroup = stationSpeed.apply(
@@ -299,7 +299,7 @@ public class TrafficRoutes {
}
@Override
- public PCollection<String> apply(PBegin begin) {
+ public PCollection<String> expand(PBegin begin) {
return begin
.apply(TextIO.Read.from(inputFile))
.apply(ParDo.of(new ExtractTimestamps()));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
index a4c1a6b..14d0f58 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
@@ -108,7 +108,7 @@ public class BigQueryTornadoes {
static class CountTornadoes
extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
@Override
- public PCollection<TableRow> apply(PCollection<TableRow> rows) {
+ public PCollection<TableRow> expand(PCollection<TableRow> rows) {
// row... => month...
PCollection<Integer> tornadoes = rows.apply(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index 93eee15..29655ea 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -125,7 +125,7 @@ public class CombinePerKeyExamples {
static class PlaysForWord
extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
@Override
- public PCollection<TableRow> apply(PCollection<TableRow> rows) {
+ public PCollection<TableRow> expand(PCollection<TableRow> rows) {
// row... => <word, play_name> ...
PCollection<KV<String, String>> words = rows.apply(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
index 6e6452c..fb6b507 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
@@ -153,7 +153,7 @@ public class FilterExamples {
@Override
- public PCollection<TableRow> apply(PCollection<TableRow> rows) {
+ public PCollection<TableRow> expand(PCollection<TableRow> rows) {
// Extract the mean_temp from each row.
PCollection<Double> meanTemps = rows.apply(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
index abc10f3..eabc42b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
@@ -100,7 +100,7 @@ public class MaxPerKeyExamples {
static class MaxMeanTemp
extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
@Override
- public PCollection<TableRow> apply(PCollection<TableRow> rows) {
+ public PCollection<TableRow> expand(PCollection<TableRow> rows) {
// row... => <month, mean_temp> ...
PCollection<KV<Integer, Double>> temps = rows.apply(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index d965d4a..bf3afca 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -171,7 +171,7 @@ public class TriggerExample {
}
@Override
- public PCollectionList<TableRow> apply(PCollection<KV<String, Integer>> flowInfo) {
+ public PCollectionList<TableRow> expand(PCollection<KV<String, Integer>> flowInfo) {
// Concept #1: The default triggering behavior
// By default Beam uses a trigger which fires when the watermark has passed the end of the
@@ -332,7 +332,7 @@ public class TriggerExample {
}
@Override
- public PCollection<TableRow> apply(PCollection<KV<String, Integer>> flowInfo) {
+ public PCollection<TableRow> expand(PCollection<KV<String, Integer>> flowInfo) {
PCollection<KV<String, Iterable<Integer>>> flowPerFreeway = flowInfo
.apply(GroupByKey.<String, Integer>create());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
index 5dbfa70..d7d4dc6 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
@@ -168,7 +168,7 @@ public class AutoCompleteTest implements Serializable {
private static class ReifyTimestamps<T>
extends PTransform<PCollection<TimestampedValue<T>>, PCollection<T>> {
@Override
- public PCollection<T> apply(PCollection<TimestampedValue<T>> input) {
+ public PCollection<T> expand(PCollection<TimestampedValue<T>> input) {
return input.apply(ParDo.of(new DoFn<TimestampedValue<T>, T>() {
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index f352252..6ad6a23 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -110,7 +110,7 @@ public class GameStats extends LeaderBoard {
private static final double SCORE_WEIGHT = 2.5;
@Override
- public PCollection<KV<String, Integer>> apply(PCollection<KV<String, Integer>> userScores) {
+ public PCollection<KV<String, Integer>> expand(PCollection<KV<String, Integer>> userScores) {
// Get the sum of scores for each user.
PCollection<KV<String, Integer>> sumScores = userScores
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 3275fa0..519bd5f 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -234,7 +234,7 @@ public class LeaderBoard extends HourlyTeamScore {
}
@Override
- public PCollection<KV<String, Integer>> apply(PCollection<GameActionInfo> infos) {
+ public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> infos) {
return infos.apply("LeaderboardTeamFixedWindows",
Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))
// We will get early (speculative) results as well as cumulative
@@ -267,7 +267,7 @@ public class LeaderBoard extends HourlyTeamScore {
}
@Override
- public PCollection<KV<String, Integer>> apply(PCollection<GameActionInfo> input) {
+ public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> input) {
return input.apply("LeaderboardUserGlobalWindow",
Window.<GameActionInfo>into(new GlobalWindows())
// Get periodic results every ten minutes.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index 2bca7fc..cb81a7e 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -160,7 +160,7 @@ public class UserScore {
}
@Override
- public PCollection<KV<String, Integer>> apply(
+ public PCollection<KV<String, Integer>> expand(
PCollection<GameActionInfo> gameInfo) {
return gameInfo
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
index 89fc271..1f33915 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
@@ -118,7 +118,7 @@ public class WriteToBigQuery<InputT>
}
@Override
- public PDone apply(PCollection<InputT> teamAndScore) {
+ public PDone expand(PCollection<InputT> teamAndScore) {
return teamAndScore
.apply("ConvertToRow", ParDo.of(new BuildRowFn()))
.apply(BigQueryIO.Write
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
index 4f2e719..c32289f 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
@@ -60,7 +60,7 @@ public class WriteWindowedToBigQuery<T>
}
@Override
- public PDone apply(PCollection<T> teamAndScore) {
+ public PDone expand(PCollection<T> teamAndScore) {
return teamAndScore
.apply("ConvertToRow", ParDo.of(new BuildRowFn()))
.apply(BigQueryIO.Write
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 5ce4fef..9507fb9 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -165,7 +165,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
}
@Override
- public PCollection<T> apply(PCollection<T> input) {
+ public PCollection<T> expand(PCollection<T> input) {
WindowingStrategy<?, ?> outputStrategy =
wrapped.getOutputStrategyInternal(input.getWindowingStrategy());
@@ -226,7 +226,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
}
@Override
- public PCollectionView<ViewT> apply(PCollection<List<ElemT>> input) {
+ public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
return view;
}
}
@@ -252,7 +252,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
}
@Override
- public PCollectionView<OutputT> apply(PCollection<InputT> input) {
+ public PCollectionView<OutputT> expand(PCollection<InputT> input) {
PCollection<OutputT> combined = input
.apply(Combine.globally(transform.getCombineFn())
.withoutDefaults().withFanout(transform.getFanout()));
@@ -282,7 +282,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
}
@Override
- public PCollectionView<T> apply(PCollection<T> input) {
+ public PCollectionView<T> expand(PCollection<T> input) {
Combine.Globally<T, T> combine = Combine
.globally(new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
if (!transform.hasDefaultValue()) {
@@ -335,7 +335,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
}
@Override
- public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
+ public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
PCollectionView<Iterable<T>> view = PCollectionViews.iterableView(input.getPipeline(),
input.getWindowingStrategy(), input.getCoder());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
index f2387f5..375932a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
@@ -40,7 +40,7 @@ public class AssignWindows<T, W extends BoundedWindow>
}
@Override
- public PCollection<T> apply(PCollection<T> input) {
+ public PCollection<T> expand(PCollection<T> input) {
return input.apply("AssignWindows", ParDo.of(new AssignWindowsDoFn<>(fn)));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
index 43047ca..694c5eb 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
@@ -77,7 +77,7 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
}
@Override
- public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+ public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
return input
@@ -109,7 +109,7 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
- public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(PCollection<KV<K, V>> input) {
+ public PCollection<KV<K, Iterable<WindowedValue<V>>>> expand(PCollection<KV<K, V>> input) {
return PCollection.createPrimitiveOutputInternal(
input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
}
@@ -128,7 +128,7 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
PCollection<KV<K, Iterable<WindowedValue<V>>>>,
PCollection<KV<K, Iterable<WindowedValue<V>>>>> {
@Override
- public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(
+ public PCollection<KV<K, Iterable<WindowedValue<V>>>> expand(
PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
return input
.apply(
@@ -225,7 +225,7 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
}
@Override
- public PCollection<KV<K, Iterable<V>>> apply(
+ public PCollection<KV<K, Iterable<V>>> expand(
PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
@SuppressWarnings("unchecked")
KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 580e842..0bf882b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -110,7 +110,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
}
@Override
- public PCollectionTuple apply(PCollection<InputT> input) {
+ public PCollectionTuple expand(PCollection<InputT> input) {
return applyTyped(input);
}
@@ -179,7 +179,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
public static class GBKIntoKeyedWorkItems<KeyT, InputT>
extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
@Override
- public PCollection<KeyedWorkItem<KeyT, InputT>> apply(PCollection<KV<KeyT, InputT>> input) {
+ public PCollection<KeyedWorkItem<KeyT, InputT>> expand(PCollection<KV<KeyT, InputT>> input) {
return PCollection.createPrimitiveOutputInternal(
input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded());
}
@@ -247,7 +247,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
}
@Override
- public PCollectionTuple apply(
+ public PCollectionTuple expand(
PCollection<? extends KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
input) {
DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
index 29dc57e..f3f93e1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
@@ -88,7 +88,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
}
@Override
- public PCollection<T> apply(PBegin input) {
+ public PCollection<T> expand(PBegin input) {
return input.getPipeline().apply(
Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 21776e7..405d913 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -45,7 +45,7 @@ class DirectGroupByKey<K, V>
}
@Override
- public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+ public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
@SuppressWarnings("unchecked")
KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
@@ -79,7 +79,7 @@ class DirectGroupByKey<K, V>
static final class DirectGroupByKeyOnly<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>> {
@Override
- public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, V>> input) {
+ public PCollection<KeyedWorkItem<K, V>> expand(PCollection<KV<K, V>> input) {
return PCollection.createPrimitiveOutputInternal(
input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded());
}
@@ -126,7 +126,7 @@ class DirectGroupByKey<K, V>
}
@Override
- public PCollection<KV<K, Iterable<V>>> apply(PCollection<KeyedWorkItem<K, V>> input) {
+ public PCollection<KV<K, Iterable<V>>> expand(PCollection<KeyedWorkItem<K, V>> input) {
return PCollection.createPrimitiveOutputInternal(
input.getPipeline(), outputWindowingStrategy, input.isBounded());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
index 77311c2..97c0983 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
@@ -28,15 +28,15 @@ import org.apache.beam.sdk.values.TypedPValue;
/**
* A base class for implementing {@link PTransform} overrides, which behave identically to the
* delegate transform but with overridden methods. Implementors are required to implement
- * {@link #delegate()}, which returns the object to forward calls to, and {@link #apply(PInput)}.
+ * {@link #delegate()}, which returns the object to forward calls to, and {@link #expand(PInput)}.
*/
public abstract class ForwardingPTransform<InputT extends PInput, OutputT extends POutput>
extends PTransform<InputT, OutputT> {
protected abstract PTransform<InputT, OutputT> delegate();
@Override
- public OutputT apply(InputT input) {
- return delegate().apply(input);
+ public OutputT expand(InputT input) {
+ return delegate().expand(input);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 9c9256d..8c96e9b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -73,7 +73,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
}
@Override
- public PCollectionTuple apply(PCollection<KV<K, InputT>> input) {
+ public PCollectionTuple expand(PCollection<KV<K, InputT>> input) {
PCollectionTuple outputs = input
.apply("Group by key", GroupByKey.<K, InputT>create())
@@ -106,7 +106,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
return underlyingParDo.getDefaultOutputCoder(originalInput, output);
}
- public PCollectionTuple apply(PCollection<? extends KV<K, Iterable<InputT>>> input) {
+ public PCollectionTuple expand(PCollection<? extends KV<K, Iterable<InputT>>> input) {
PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal(
input.getPipeline(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
index 7f2de66..10530bb 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
@@ -49,7 +49,7 @@ class ParDoSingleViaMultiOverrideFactory<InputT, OutputT>
}
@Override
- public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
+ public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
// Output tags for ParDo need only be unique up to applied transform
TupleTag<OutputT> mainOutputTag = new TupleTag<OutputT>(MAIN_OUTPUT_TAG);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 9df7cdc..3601dbc 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -170,7 +170,7 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
}
@Override
- public PCollection<T> apply(PBegin input) {
+ public PCollection<T> expand(PBegin input) {
PipelineRunner<?> runner = input.getPipeline().getRunner();
checkState(
runner instanceof DirectRunner,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index b92ade1..460b1c2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -115,7 +115,7 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
}
@Override
- public PCollectionView<ViewT> apply(PCollection<ElemT> input) {
+ public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
return input.apply(WithKeys.<Void, ElemT>of((Void) null))
.setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()))
.apply(GroupByKey.<Void, ElemT>create())
@@ -145,7 +145,7 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
}
@Override
- public PCollectionView<ViewT> apply(PCollection<Iterable<ElemT>> input) {
+ public PCollectionView<ViewT> expand(PCollection<Iterable<ElemT>> input) {
return og.getView();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index cf535cf..3c88337 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -66,7 +66,7 @@ class WriteWithShardingFactory<InputT>
}
@Override
- public PDone apply(PCollection<T> input) {
+ public PDone expand(PCollection<T> input) {
checkArgument(IsBounded.BOUNDED == input.isBounded(),
"%s can only be applied to a Bounded PCollection",
getClass().getSimpleName());
@@ -92,7 +92,7 @@ class WriteWithShardingFactory<InputT>
// without adding a new Write Transform Node, which would be overwritten the same way, leading
// to an infinite recursion. We cannot modify the number of shards, because that is determined
// at runtime.
- return original.apply(resharded);
+ return original.expand(resharded);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index 00dca20..c6986c0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -52,7 +52,7 @@ public class CommittedResultTest implements Serializable {
private transient AppliedPTransform<?, ?, ?> transform =
AppliedPTransform.of("foo", p.begin(), PDone.in(p), new PTransform<PBegin, PDone>() {
@Override
- public PDone apply(PBegin begin) {
+ public PDone expand(PBegin begin) {
throw new IllegalArgumentException("Should never be applied");
}
});
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
index 5ad278b..b88c9a4 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -213,7 +213,7 @@ public class DirectGraphVisitorTest implements Serializable {
transformed.apply(
new PTransform<PInput, PDone>() {
@Override
- public PDone apply(PInput input) {
+ public PDone expand(PInput input) {
return PDone.in(input.getPipeline());
}
});
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
index c75adaa..6860a58 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
@@ -66,8 +66,8 @@ public class ForwardingPTransformTest {
PCollection<Integer> collection = mock(PCollection.class);
@SuppressWarnings("unchecked")
PCollection<String> output = mock(PCollection.class);
- when(delegate.apply(collection)).thenReturn(output);
- PCollection<String> result = forwarding.apply(collection);
+ when(delegate.expand(collection)).thenReturn(output);
+ PCollection<String> result = forwarding.expand(collection);
assertThat(result, equalTo(output));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
index cf65936..0852cd3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
@@ -163,7 +163,7 @@ public class KeyedPValueTrackingVisitorTest {
private static class PrimitiveKeyer<K> extends PTransform<PCollection<K>, PCollection<K>> {
@Override
- public PCollection<K> apply(PCollection<K> input) {
+ public PCollection<K> expand(PCollection<K> input) {
return PCollection.<K>createPrimitiveOutputInternal(
input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
.setCoder(input.getCoder());
@@ -172,7 +172,7 @@ public class KeyedPValueTrackingVisitorTest {
private static class CompositeKeyer<K> extends PTransform<PCollection<K>, PCollection<K>> {
@Override
- public PCollection<K> apply(PCollection<K> input) {
+ public PCollection<K> expand(PCollection<K> input) {
return input.apply(new PrimitiveKeyer<K>()).apply(ParDo.of(new IdentityFn<K>()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index b946d98..89e261b 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -176,7 +176,7 @@ public class TFIDF {
}
@Override
- public PCollection<KV<URI, String>> apply(PBegin input) {
+ public PCollection<KV<URI, String>> expand(PBegin input) {
Pipeline pipeline = input.getPipeline();
// Create one TextIO.Read transform for each document
@@ -219,7 +219,7 @@ public class TFIDF {
public ComputeTfIdf() { }
@Override
- public PCollection<KV<String, KV<URI, Double>>> apply(
+ public PCollection<KV<String, KV<URI, Double>>> expand(
PCollection<KV<URI, String>> uriToContent) {
// Compute the total number of documents, and
@@ -419,7 +419,7 @@ public class TFIDF {
}
@Override
- public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
+ public PDone expand(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
return wordToUriAndTfIdf
.apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() {
private static final long serialVersionUID = 0;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
index c816442..b6b3c1a 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
@@ -72,7 +72,7 @@ public class WordCount {
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
@Override
- public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
+ public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index 1e0c3ac..3405981 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -82,7 +82,7 @@ public class AutoComplete {
}
@Override
- public PCollection<KV<String, List<CompletionCandidate>>> apply(PCollection<String> input) {
+ public PCollection<KV<String, List<CompletionCandidate>>> expand(PCollection<String> input) {
PCollection<CompletionCandidate> candidates = input
// First count how often each token appears.
.apply(new Count.PerElement<String>())
@@ -129,7 +129,7 @@ public class AutoComplete {
}
@Override
- public PCollection<KV<String, List<CompletionCandidate>>> apply(
+ public PCollection<KV<String, List<CompletionCandidate>>> expand(
PCollection<CompletionCandidate> input) {
return input
// For each completion candidate, map it to all prefixes.
@@ -192,7 +192,7 @@ public class AutoComplete {
}
@Override
- public PCollectionList<KV<String, List<CompletionCandidate>>> apply(
+ public PCollectionList<KV<String, List<CompletionCandidate>>> expand(
PCollection<CompletionCandidate> input) {
if (minPrefix > 10) {
// Base case, partitioning to return the output in the expected format.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 0b92734..7c1284b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -307,7 +307,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
}
@Override
- public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) {
+ public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
PCollectionView<Map<K, V>> view =
PCollectionViews.mapView(
input.getPipeline(),
@@ -352,7 +352,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
}
@Override
- public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+ public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
PCollectionView<Map<K, Iterable<V>>> view =
PCollectionViews.multimapView(
input.getPipeline(),
@@ -392,7 +392,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
public StreamingViewAsList(FlinkRunner runner, View.AsList<T> transform) {}
@Override
- public PCollectionView<List<T>> apply(PCollection<T> input) {
+ public PCollectionView<List<T>> expand(PCollection<T> input) {
PCollectionView<List<T>> view =
PCollectionViews.listView(
input.getPipeline(),
@@ -423,7 +423,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
public StreamingViewAsIterable(FlinkRunner runner, View.AsIterable<T> transform) { }
@Override
- public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
+ public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
PCollectionView<Iterable<T>> view =
PCollectionViews.iterableView(
input.getPipeline(),
@@ -465,7 +465,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
}
@Override
- public PCollectionView<T> apply(PCollection<T> input) {
+ public PCollectionView<T> expand(PCollection<T> input) {
Combine.Globally<T, T> combine = Combine.globally(
new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
if (!transform.hasDefaultValue()) {
@@ -523,7 +523,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
}
@Override
- public PCollectionView<OutputT> apply(PCollection<InputT> input) {
+ public PCollectionView<OutputT> expand(PCollection<InputT> input) {
PCollection<OutputT> combined =
input.apply(Combine.globally(transform.getCombineFn())
.withoutDefaults()
@@ -620,7 +620,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
}
@Override
- public PCollectionView<ViewT> apply(PCollection<List<ElemT>> input) {
+ public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
return view;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 40d8948..00c94d0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -756,7 +756,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
@Override
- public PCollection<KV<K1, Iterable<KV<K2, V>>>> apply(PCollection<KV<K1, KV<K2, V>>> input) {
+ public PCollection<KV<K1, Iterable<KV<K2, V>>>> expand(PCollection<KV<K1, KV<K2, V>>> input) {
PCollection<KV<K1, Iterable<KV<K2, V>>>> rval =
PCollection.<KV<K1, Iterable<KV<K2, V>>>>createPrimitiveOutputInternal(
input.getPipeline(),
@@ -814,7 +814,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
@Override
- public PCollection<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>> apply(PCollection<T> input) {
+ public PCollection<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>> expand(
+ PCollection<T> input) {
@SuppressWarnings("unchecked")
Coder<W> windowCoder = (Coder<W>)
input.getWindowingStrategy().getWindowFn().windowCoder();
@@ -902,7 +903,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
@Override
- public PCollectionView<T> apply(PCollection<T> input) {
+ public PCollectionView<T> expand(PCollection<T> input) {
@SuppressWarnings("unchecked")
Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>)
input.getWindowingStrategy().getWindowFn().windowCoder();
@@ -993,7 +994,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
@Override
- public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
+ public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
PCollectionView<Iterable<T>> view = PCollectionViews.iterableView(
input.getPipeline(), input.getWindowingStrategy(), input.getCoder());
return BatchViewAsList.applyForIterableLike(runner, input, view);
@@ -1097,7 +1098,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
@Override
- public PCollectionView<List<T>> apply(PCollection<T> input) {
+ public PCollectionView<List<T>> expand(PCollection<T> input) {
PCollectionView<List<T>> view = PCollectionViews.listView(
input.getPipeline(), input.getWindowingStrategy(), input.getCoder());
return applyForIterableLike(runner, input, view);
@@ -1265,7 +1266,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
@Override
- public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) {
+ public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
return this.<BoundedWindow>applyInternal(input);
}
@@ -1406,7 +1407,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@Override
public PCollection<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>>
- apply(PCollection<KV<K, V>> input) {
+ expand(PCollection<KV<K, V>> input) {
@SuppressWarnings("unchecked")
Coder<W> windowCoder = (Coder<W>)
@@ -1754,7 +1755,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
@Override
- public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+ public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
return this.<BoundedWindow>applyInternal(input);
}
@@ -2056,13 +2057,13 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
@Override
- public PDone apply(PCollection<T> input) {
+ public PDone expand(PCollection<T> input) {
if (transform.getSink() instanceof FileBasedSink) {
FileBasedSink<?> sink = (FileBasedSink<?>) transform.getSink();
PathValidator validator = runner.options.getPathValidator();
validator.validateOutputFilePrefixSupported(sink.getBaseOutputFilename());
}
- return transform.apply(input);
+ return transform.expand(input);
}
}
@@ -2071,7 +2072,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// ================================================================================
/**
- * Suppress application of {@link PubsubUnboundedSource#apply} in streaming mode so that we
+ * Suppress application of {@link PubsubUnboundedSource#expand} in streaming mode so that we
* can instead defer to Windmill's implementation.
*/
private static class StreamingPubsubIORead<T> extends PTransform<PBegin, PCollection<T>> {
@@ -2090,7 +2091,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
@Override
- public PCollection<T> apply(PBegin input) {
+ public PCollection<T> expand(PBegin input) {
return PCollection.<T>createPrimitiveOutputInternal(
input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
.setCoder(transform.getElementCoder());
@@ -2155,7 +2156,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
/**
- * Suppress application of {@link PubsubUnboundedSink#apply} in streaming mode so that we
+ * Suppress application of {@link PubsubUnboundedSink#expand} in streaming mode so that we
* can instead defer to Windmill's implementation.
*/
private static class StreamingPubsubIOWrite<T> extends PTransform<PCollection<T>, PDone> {
@@ -2174,7 +2175,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
@Override
- public PDone apply(PCollection<T> input) {
+ public PDone expand(PCollection<T> input) {
return PDone.in(input.getPipeline());
}
@@ -2252,7 +2253,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
@Override
- public final PCollection<T> apply(PInput input) {
+ public final PCollection<T> expand(PInput input) {
source.validate();
if (source.requiresDeduping()) {
@@ -2277,7 +2278,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
@Override
- public final PCollection<ValueWithRecordId<T>> apply(PInput input) {
+ public final PCollection<ValueWithRecordId<T>> expand(PInput input) {
return PCollection.<ValueWithRecordId<T>>createPrimitiveOutputInternal(
input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED);
}
@@ -2327,7 +2328,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// more per-key overhead.
private static final int NUM_RESHARD_KEYS = 10000;
@Override
- public PCollection<T> apply(PCollection<ValueWithRecordId<T>> input) {
+ public PCollection<T> expand(PCollection<ValueWithRecordId<T>> input) {
return input
.apply(WithKeys.of(new SerializableFunction<ValueWithRecordId<T>, Integer>() {
@Override
@@ -2367,7 +2368,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
@Override
- public final PCollection<T> apply(PBegin input) {
+ public final PCollection<T> expand(PBegin input) {
source.validate();
return Pipeline.applyTransform(input, new DataflowUnboundedReadFromBoundedSource<>(source))
@@ -2425,7 +2426,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
@Override
- public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) {
+ public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
PCollectionView<Map<K, V>> view =
PCollectionViews.mapView(
input.getPipeline(),
@@ -2470,7 +2471,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
@Override
- public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+ public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
PCollectionView<Map<K, Iterable<V>>> view =
PCollectionViews.multimapView(
input.getPipeline(),
@@ -2511,7 +2512,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
public StreamingViewAsList(DataflowRunner runner, View.AsList<T> transform) {}
@Override
- public PCollectionView<List<T>> apply(PCollection<T> input) {
+ public PCollectionView<List<T>> expand(PCollection<T> input) {
PCollectionView<List<T>> view =
PCollectionViews.listView(
input.getPipeline(),
@@ -2543,7 +2544,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
public StreamingViewAsIterable(DataflowRunner runner, View.AsIterable<T> transform) { }
@Override
- public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
+ public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
PCollectionView<Iterable<T>> view =
PCollectionViews.iterableView(
input.getPipeline(),
@@ -2586,7 +2587,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
@Override
- public PCollectionView<T> apply(PCollection<T> input) {
+ public PCollectionView<T> expand(PCollection<T> input) {
Combine.Globally<T, T> combine = Combine.globally(
new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
if (!transform.hasDefaultValue()) {
@@ -2644,7 +2645,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
@Override
- public PCollectionView<OutputT> apply(PCollection<InputT> input) {
+ public PCollectionView<OutputT> expand(PCollection<InputT> input) {
PCollection<OutputT> combined =
input.apply(Combine.<InputT, OutputT>globally(transform.getCombineFn())
.withoutDefaults()
@@ -2770,7 +2771,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@Override
- public OutputT apply(InputT input) {
+ public OutputT expand(InputT input) {
String mode = input.getPipeline().getOptions().as(StreamingOptions.class).isStreaming()
? "streaming" : "batch";
String name =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
index 62d4aff..68ee7bc 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
@@ -53,7 +53,7 @@ public class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>>
}
@Override
- public PCollection<T> apply(PCollection<T> input) {
+ public PCollection<T> expand(PCollection<T> input) {
WindowingStrategy<?, ?> outputStrategy =
transform.getOutputStrategyInternal(input.getWindowingStrategy());
if (transform.getWindowFn() != null) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
index 96a35bc..e1eedd8 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
@@ -93,7 +93,7 @@ public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin
}
@Override
- public PCollection<T> apply(PBegin input) {
+ public PCollection<T> expand(PBegin input) {
return input.getPipeline().apply(
Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 95c7132..ac4f2df 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -551,7 +551,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
}
@Override
- public PCollection<String> apply(PCollection<String> input) {
+ public PCollection<String> expand(PCollection<String> input) {
return PCollection.createPrimitiveOutputInternal(
input.getPipeline(),
WindowingStrategy.globalDefault(),
@@ -585,7 +585,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
extends PTransform<PCollection<Integer>, PCollection<Integer>> {
@Override
- public PCollection<Integer> apply(PCollection<Integer> input) {
+ public PCollection<Integer> expand(PCollection<Integer> input) {
// Apply an operation so that this is a composite transform.
input.apply(Count.<Integer>perElement());
@@ -606,7 +606,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
extends PTransform<PCollection<Integer>, PDone> {
@Override
- public PDone apply(PCollection<Integer> input) {
+ public PDone expand(PCollection<Integer> input) {
// Apply an operation so that this is a composite transform.
input.apply(Count.<Integer>perElement());
@@ -631,7 +631,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
public final TupleTag<Void> doneTag = new TupleTag<>("done");
@Override
- public PCollectionTuple apply(PCollection<Integer> input) {
+ public PCollectionTuple expand(PCollection<Integer> input) {
PCollection<Integer> sum = input.apply(Sum.integersGlobally());
// Fails here when attempting to construct a tuple with an unbound object.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 5375c95..1959be5 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -840,7 +840,7 @@ public class DataflowRunnerTest {
public boolean translated = false;
@Override
- public PCollection<Integer> apply(PCollection<Integer> input) {
+ public PCollection<Integer> expand(PCollection<Integer> input) {
return PCollection.<Integer>createPrimitiveOutputInternal(
input.getPipeline(),
WindowingStrategy.globalDefault(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
index 67408ae..c9c7806 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
@@ -104,7 +104,7 @@ public class DataflowGroupByKeyTest {
p.apply(
new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
@Override
- public PCollection<KV<String, Integer>> apply(PBegin input) {
+ public PCollection<KV<String, Integer>> expand(PBegin input) {
return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
input.getPipeline(),
WindowingStrategy.globalDefault(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
index b9220af..4558683 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
@@ -93,7 +93,7 @@ public class DataflowViewTest {
.apply(
new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
@Override
- public PCollection<KV<String, Integer>> apply(PBegin input) {
+ public PCollection<KV<String, Integer>> expand(PBegin input) {
return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
input.getPipeline(),
WindowingStrategy.globalDefault(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index d7e5207..b2672b5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -85,7 +85,7 @@ public class WordCount {
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
@Override
- public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
+ public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
index b1c567c..0a56633 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
@@ -62,7 +62,7 @@ public final class ConsoleIO {
}
@Override
- public PDone apply(PCollection<T> input) {
+ public PDone expand(PCollection<T> input) {
return PDone.in(input.getPipeline());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
index a08c54e..7ebba90 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
@@ -63,7 +63,7 @@ public final class CreateStream<T> {
}
@Override
- public PCollection<T> apply(PBegin input) {
+ public PCollection<T> expand(PBegin input) {
// Spark streaming micro batches are bounded by default
return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
index 042c316..f2457ce 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
@@ -94,7 +94,7 @@ public final class HadoopIO {
}
@Override
- public PCollection<KV<K, V>> apply(PBegin input) {
+ public PCollection<KV<K, V>> expand(PBegin input) {
return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED);
}
@@ -197,7 +197,7 @@ public final class HadoopIO {
}
@Override
- public PDone apply(PCollection<KV<K, V>> input) {
+ public PDone expand(PCollection<KV<K, V>> input) {
checkNotNull(
filenamePrefix, "need to set the filename prefix of an HadoopIO.Write transform");
checkNotNull(formatClass, "need to set the format class of an HadoopIO.Write transform");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
index 6944dbf..30b51e6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.values.PCollection;
public final class StorageLevelPTransform extends PTransform<PCollection<?>, PCollection<String>> {
@Override
- public PCollection<String> apply(PCollection<?> input) {
+ public PCollection<String> expand(PCollection<?> input) {
return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
WindowingStrategy.globalDefault(),
PCollection.IsBounded.BOUNDED);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
index 654614a..7580da7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
@@ -35,7 +35,7 @@ public class SinglePrimitiveOutputPTransform<T> extends PTransform<PInput, PColl
}
@Override
- public PCollection<T> apply(PInput input) {
+ public PCollection<T> expand(PInput input) {
try {
PCollection<T> collection = PCollection.<T>createPrimitiveOutputInternal(
input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index bd0c655..01a4cba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -281,7 +281,7 @@ public class AvroIO {
}
@Override
- public PCollection<T> apply(PBegin input) {
+ public PCollection<T> expand(PBegin input) {
if (filepattern == null) {
throw new IllegalStateException(
"need to set the filepattern of an AvroIO.Read transform");
@@ -795,7 +795,7 @@ public class AvroIO {
}
@Override
- public PDone apply(PCollection<T> input) {
+ public PDone expand(PCollection<T> input) {
if (filenamePrefix == null) {
throw new IllegalStateException(
"need to set the filename prefix of an AvroIO.Write transform");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index 123dca8..f2ef358 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -84,7 +84,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
}
@Override
- public PCollection<T> apply(PBegin input) {
+ public PCollection<T> expand(PBegin input) {
PCollection<ValueWithRecordId<T>> read = Pipeline.applyTransform(input,
Read.from(new UnboundedToBoundedSourceAdapter<>(source, maxNumRecords, maxReadTime)));
if (source.requiresDeduping()) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
index 456d291..3148d8d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
@@ -130,7 +130,7 @@ public class CountingInput {
}
@Override
- public PCollection<Long> apply(PBegin begin) {
+ public PCollection<Long> expand(PBegin begin) {
return begin.apply(Read.from(CountingSource.createSourceForSubrange(startIndex, endIndex)));
}
@@ -236,7 +236,7 @@ public class CountingInput {
@SuppressWarnings("deprecation")
@Override
- public PCollection<Long> apply(PBegin begin) {
+ public PCollection<Long> expand(PBegin begin) {
Unbounded<Long> read =
Read.from(
CountingSource.createUnbounded()