You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/08 17:47:01 UTC
[2/3] incubator-beam git commit: Rename PTransform.apply to
PTransform.expand
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 9768788..9a6b534 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -700,7 +700,7 @@ public class PubsubIO {
}
@Override
- public PCollection<T> apply(PBegin input) {
+ public PCollection<T> expand(PBegin input) {
if (topic == null && subscription == null) {
throw new IllegalStateException("Need to set either the topic or the subscription for "
+ "a PubsubIO.Read transform");
@@ -1057,7 +1057,7 @@ public class PubsubIO {
}
@Override
- public PDone apply(PCollection<T> input) {
+ public PDone expand(PCollection<T> input) {
if (topic == null) {
throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index 1a86a1c..1992cb8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -424,7 +424,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
}
@Override
- public PDone apply(PCollection<T> input) {
+ public PDone expand(PCollection<T> input) {
input.apply("PubsubUnboundedSink.Window", Window.<T>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index cea74bc..da3b437 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -1339,7 +1339,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
}
@Override
- public PCollection<T> apply(PBegin input) {
+ public PCollection<T> expand(PBegin input) {
return input.getPipeline().begin()
.apply(Read.from(new PubsubSource<T>(this)))
.apply("PubsubUnboundedSource.Stats",
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index f04fbaf..7ec3b0e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -101,7 +101,7 @@ public class Read {
}
@Override
- public final PCollection<T> apply(PBegin input) {
+ public final PCollection<T> expand(PBegin input) {
source.validate();
return PCollection.<T>createPrimitiveOutputInternal(input.getPipeline(),
@@ -169,7 +169,7 @@ public class Read {
}
@Override
- public final PCollection<T> apply(PBegin input) {
+ public final PCollection<T> expand(PBegin input) {
source.validate();
return PCollection.<T>createPrimitiveOutputInternal(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index e967a27..54e73d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -292,7 +292,7 @@ public class TextIO {
}
@Override
- public PCollection<T> apply(PBegin input) {
+ public PCollection<T> expand(PBegin input) {
if (filepattern == null) {
throw new IllegalStateException("need to set the filepattern of a TextIO.Read transform");
}
@@ -742,7 +742,7 @@ public class TextIO {
}
@Override
- public PDone apply(PCollection<T> input) {
+ public PDone expand(PCollection<T> input) {
if (filenamePrefix == null) {
throw new IllegalStateException(
"need to set the filename prefix of a TextIO.Write transform");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index 7559fca..bc651d8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -104,7 +104,7 @@ public class Write {
}
@Override
- public PDone apply(PCollection<T> input) {
+ public PDone expand(PCollection<T> input) {
checkArgument(IsBounded.BOUNDED == input.isBounded(),
"%s can only be applied to a Bounded PCollection",
Write.class.getSimpleName());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
index 77f5128..8604dbc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
@@ -73,6 +73,6 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> {
*/
public <OutputT extends POutput, InputT extends PInput> OutputT apply(
PTransform<InputT, OutputT> transform, InputT input) {
- return transform.apply(input);
+ return transform.expand(input);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
index 2b311b7..bf2cd0b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
@@ -55,7 +55,7 @@ class GatherAllPanes<T>
private GatherAllPanes() {}
@Override
- public PCollection<Iterable<ValueInSingleWindow<T>>> apply(PCollection<T> input) {
+ public PCollection<Iterable<ValueInSingleWindow<T>>> expand(PCollection<T> input) {
WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn();
return input
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index 7dc78d8..b23f4f3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -763,7 +763,7 @@ public class PAssert {
}
@Override
- public PCollectionView<ActualT> apply(PBegin input) {
+ public PCollectionView<ActualT> expand(PBegin input) {
final Coder<T> coder = actual.getCoder();
return actual
.apply("FilterActuals", rewindowActuals.<T>prepareActuals())
@@ -833,7 +833,7 @@ public class PAssert {
}
@Override
- public PCollection<Iterable<ValueInSingleWindow<T>>> apply(PCollection<T> input) {
+ public PCollection<Iterable<ValueInSingleWindow<T>>> expand(PCollection<T> input) {
final int combinedKey = 42;
// Remove the triggering on both
@@ -925,7 +925,7 @@ public class PAssert {
}
@Override
- public PDone apply(PCollection<T> input) {
+ public PDone expand(PCollection<T> input) {
input
.apply("GroupGlobally", new GroupGlobally<T>(rewindowingStrategy))
.apply("GetPane", MapElements.via(paneExtractor))
@@ -958,7 +958,7 @@ public class PAssert {
}
@Override
- public PDone apply(PCollection<Iterable<T>> input) {
+ public PDone expand(PCollection<Iterable<T>> input) {
input
.apply("GroupGlobally", new GroupGlobally<Iterable<T>>(rewindowingStrategy))
.apply("GetPane", MapElements.via(paneExtractor))
@@ -995,7 +995,7 @@ public class PAssert {
}
@Override
- public PDone apply(PBegin input) {
+ public PDone expand(PBegin input) {
final PCollectionView<ActualT> actual = input.apply("CreateActual", createActual);
input
@@ -1321,7 +1321,7 @@ public class PAssert {
}
@Override
- public PCollection<T> apply(PCollection<T> input) {
+ public PCollection<T> expand(PCollection<T> input) {
return input.apply("FilterWindows", ParDo.of(new Fn()));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
index 509bb24..da93cdc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
@@ -252,7 +252,7 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
}
@Override
- public PCollection<T> apply(PBegin input) {
+ public PCollection<T> expand(PBegin input) {
throw new IllegalStateException(
String.format(
"Pipeline Runner %s does not provide a required override for %s",
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
index 71c2158..33820e0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
@@ -204,7 +204,7 @@ public class ApproximateUnique {
}
@Override
- public PCollection<Long> apply(PCollection<T> input) {
+ public PCollection<Long> expand(PCollection<T> input) {
Coder<T> coder = input.getCoder();
return input.apply(
Combine.globally(
@@ -271,7 +271,7 @@ public class ApproximateUnique {
}
@Override
- public PCollection<KV<K, Long>> apply(PCollection<KV<K, V>> input) {
+ public PCollection<KV<K, Long>> expand(PCollection<KV<K, V>> input) {
Coder<KV<K, V>> inputCoder = input.getCoder();
if (!(inputCoder instanceof KvCoder)) {
throw new IllegalStateException(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 4127d94..3b07260 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -1431,7 +1431,7 @@ public class Combine {
}
@Override
- public PCollection<OutputT> apply(PCollection<InputT> input) {
+ public PCollection<OutputT> expand(PCollection<InputT> input) {
PCollection<KV<Void, InputT>> withKeys = input
.apply(WithKeys.<Void, InputT>of((Void) null))
.setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()));
@@ -1569,7 +1569,7 @@ public class Combine {
}
@Override
- public PCollectionView<OutputT> apply(PCollection<InputT> input) {
+ public PCollectionView<OutputT> expand(PCollection<InputT> input) {
Globally<InputT, OutputT> combineGlobally =
Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout);
if (insertDefault) {
@@ -1866,7 +1866,7 @@ public class Combine {
}
@Override
- public PCollection<KV<K, OutputT>> apply(PCollection<KV<K, InputT>> input) {
+ public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) {
return input
.apply(GroupByKey.<K, InputT>create(fewKeys))
.apply(Combine.<K, InputT, OutputT>groupedValues(fn, fnDisplayData)
@@ -1901,7 +1901,7 @@ public class Combine {
}
@Override
- public PCollection<KV<K, OutputT>> apply(PCollection<KV<K, InputT>> input) {
+ public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) {
return applyHelper(input);
}
@@ -2388,7 +2388,7 @@ public class Combine {
}
@Override
- public PCollection<KV<K, OutputT>> apply(
+ public PCollection<KV<K, OutputT>> expand(
PCollection<? extends KV<K, ? extends Iterable<InputT>>> input) {
PCollection<KV<K, OutputT>> output = input.apply(ParDo.of(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index b393a30..9101996 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -103,7 +103,7 @@ public class Count {
public PerElement() { }
@Override
- public PCollection<KV<T, Long>> apply(PCollection<T> input) {
+ public PCollection<KV<T, Long>> expand(PCollection<T> input) {
return
input
.apply("Init", MapElements.via(new SimpleFunction<T, KV<T, Void>>() {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 7cd4711..a48136f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -240,7 +240,7 @@ public class Create<T> {
}
@Override
- public PCollection<T> apply(PBegin input) {
+ public PCollection<T> expand(PBegin input) {
try {
Coder<T> coder = getDefaultOutputCoder(input);
try {
@@ -440,7 +440,7 @@ public class Create<T> {
}
@Override
- public PCollection<T> apply(PBegin input) {
+ public PCollection<T> expand(PBegin input) {
try {
Iterable<T> rawElements =
Iterables.transform(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
index fba428b..2d08cee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
@@ -82,7 +82,7 @@ public class Distinct<T> extends PTransform<PCollection<T>,
}
@Override
- public PCollection<T> apply(PCollection<T> in) {
+ public PCollection<T> expand(PCollection<T> in) {
return in
.apply("CreateIndex", MapElements.via(new SimpleFunction<T, KV<T, Void>>() {
@Override
@@ -121,7 +121,7 @@ public class Distinct<T> extends PTransform<PCollection<T>,
}
@Override
- public PCollection<T> apply(PCollection<T> in) {
+ public PCollection<T> expand(PCollection<T> in) {
WithKeys<IdT, T> withKeys = WithKeys.of(fn);
if (representativeType != null) {
withKeys = withKeys.withKeyType(representativeType);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
index 2d9bdee..a564999 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
@@ -201,7 +201,7 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
}
@Override
- public PCollection<T> apply(PCollection<T> input) {
+ public PCollection<T> expand(PCollection<T> input) {
return input.apply(ParDo.of(new DoFn<T, T>() {
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index 4ef809f..c165f7f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -129,7 +129,7 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
}
@Override
- public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
+ public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
return input.apply(
"FlatMap",
ParDo.of(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index e612836..3ef2e55 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -105,7 +105,7 @@ public class Flatten {
private FlattenPCollectionList() { }
@Override
- public PCollection<T> apply(PCollectionList<T> inputs) {
+ public PCollection<T> expand(PCollectionList<T> inputs) {
WindowingStrategy<?, ?> windowingStrategy;
IsBounded isBounded = IsBounded.BOUNDED;
if (!inputs.getAll().isEmpty()) {
@@ -163,7 +163,7 @@ public class Flatten {
extends PTransform<PCollection<? extends Iterable<T>>, PCollection<T>> {
@Override
- public PCollection<T> apply(PCollection<? extends Iterable<T>> in) {
+ public PCollection<T> expand(PCollection<? extends Iterable<T>> in) {
Coder<? extends Iterable<T>> inCoder = in.getCoder();
if (!(inCoder instanceof IterableLikeCoder)) {
throw new IllegalArgumentException(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index 1faac59..a339af7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -216,7 +216,7 @@ public class GroupByKey<K, V>
}
@Override
- public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+ public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
// This primitive operation groups by the combination of key and window,
// merging windows as needed, using the windows assigned to the
// key/value input elements and the window merge operation of the
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
index 2405adf..c6f307d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
@@ -56,7 +56,7 @@ public class Keys<K> extends PTransform<PCollection<? extends KV<K, ?>>,
private Keys() { }
@Override
- public PCollection<K> apply(PCollection<? extends KV<K, ?>> in) {
+ public PCollection<K> expand(PCollection<? extends KV<K, ?>> in) {
return
in.apply("Keys", MapElements.via(new SimpleFunction<KV<K, ?>, K>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
index 2b81ebf..dbe262b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
@@ -60,7 +60,7 @@ public class KvSwap<K, V> extends PTransform<PCollection<KV<K, V>>,
private KvSwap() { }
@Override
- public PCollection<KV<V, K>> apply(PCollection<KV<K, V>> in) {
+ public PCollection<KV<V, K>> expand(PCollection<KV<K, V>> in) {
return
in.apply("KvSwap", MapElements.via(new SimpleFunction<KV<K, V>, KV<V, K>>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
index 83cceca..9c2d715 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
@@ -159,7 +159,7 @@ public class Latest {
private static class Globally<T> extends PTransform<PCollection<T>, PCollection<T>> {
@Override
- public PCollection<T> apply(PCollection<T> input) {
+ public PCollection<T> expand(PCollection<T> input) {
Coder<T> inputCoder = input.getCoder();
return input
@@ -178,7 +178,7 @@ public class Latest {
private static class PerKey<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
@Override
- public PCollection<KV<K, V>> apply(PCollection<KV<K, V>> input) {
+ public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
checkNotNull(input);
checkArgument(input.getCoder() instanceof KvCoder,
"Input specifiedCoder must be an instance of KvCoder, but was %s", input.getCoder());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index c109034..421b2ab 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -111,7 +111,7 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
}
@Override
- public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
+ public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
return input.apply(
"Map",
ParDo.of(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
index 83fe577..ce4891d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
@@ -129,7 +129,7 @@ import org.apache.beam.sdk.values.TypedPValue;
* output value as their apply implementation.
* The majority of PTransforms are
* implemented as composites of other PTransforms. Such a PTransform
- * subclass typically just implements {@link #apply}, computing its
+ * subclass typically just implements {@link #expand}, computing its
* Output value from its {@code InputT} value. User programs are encouraged to
* use this mechanism to modularize their own code. Such composite
* abstractions get their own name, and navigating through the
@@ -181,7 +181,7 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
* a new unbound output and register evaluators (via backend-specific
* registration methods).
*/
- public abstract OutputT apply(InputT input);
+ public abstract OutputT expand(InputT input);
/**
* Called before invoking apply (which may be intercepted by the runner) to
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index ba6e644..4f7491e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -795,7 +795,7 @@ public class ParDo {
}
@Override
- public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
+ public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
checkArgument(
!isSplittable(getOldFn()), "Splittable DoFn not supported by the current runner");
validateWindowType(input, fn);
@@ -1052,7 +1052,7 @@ public class ParDo {
@Override
- public PCollectionTuple apply(PCollection<? extends InputT> input) {
+ public PCollectionTuple expand(PCollection<? extends InputT> input) {
checkArgument(
!isSplittable(getOldFn()), "Splittable DoFn not supported by the current runner");
validateWindowType(input, fn);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
index b525872..e0b2b61 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
@@ -100,7 +100,7 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>>
/////////////////////////////////////////////////////////////////////////////
@Override
- public PCollectionList<T> apply(PCollection<T> in) {
+ public PCollectionList<T> expand(PCollection<T> in) {
final TupleTagList outputTags = partitionDoFn.getOutputTags();
PCollectionTuple outputs = in.apply(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java
index a94130d..14c5d1b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java
@@ -179,7 +179,7 @@ public class Regex {
this.group = group;
}
- public PCollection<String> apply(PCollection<String> in) {
+ public PCollection<String> expand(PCollection<String> in) {
return in.apply(
ParDo.of(
new DoFn<String, String>() {
@@ -224,7 +224,7 @@ public class Regex {
this.valueGroup = valueGroup;
}
- public PCollection<KV<String, String>> apply(PCollection<String> in) {
+ public PCollection<KV<String, String>> expand(PCollection<String> in) {
return in.apply(
ParDo.of(
new DoFn<String, KV<String, String>>() {
@@ -266,7 +266,7 @@ public class Regex {
this.group = group;
}
- public PCollection<String> apply(PCollection<String> in) {
+ public PCollection<String> expand(PCollection<String> in) {
return in.apply(
ParDo.of(
new DoFn<String, String>() {
@@ -312,7 +312,7 @@ public class Regex {
this.valueGroup = valueGroup;
}
- public PCollection<KV<String, String>> apply(PCollection<String> in) {
+ public PCollection<KV<String, String>> expand(PCollection<String> in) {
return in.apply(
ParDo.of(
new DoFn<String, KV<String, String>>() {
@@ -354,7 +354,7 @@ public class Regex {
this.replacement = replacement;
}
- public PCollection<String> apply(PCollection<String> in) {
+ public PCollection<String> expand(PCollection<String> in) {
return in.apply(
ParDo.of(
new DoFn<String, String>() {
@@ -393,7 +393,7 @@ public class Regex {
this.replacement = replacement;
}
- public PCollection<String> apply(PCollection<String> in) {
+ public PCollection<String> expand(PCollection<String> in) {
return in.apply(
ParDo.of(
new DoFn<String, String>() {
@@ -434,7 +434,7 @@ public class Regex {
this.outputEmpty = outputEmpty;
}
- public PCollection<String> apply(PCollection<String> in) {
+ public PCollection<String> expand(PCollection<String> in) {
return in.apply(
ParDo.of(
new DoFn<String, String>() {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
index eca987a..7d4e630 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
@@ -143,7 +143,7 @@ public class Sample {
}
@Override
- public PCollection<T> apply(PCollection<T> in) {
+ public PCollection<T> expand(PCollection<T> in) {
PCollectionView<Iterable<T>> iterableView = in.apply(View.<T>asIterable());
return
in.getPipeline()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
index d21d100..faf402c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
@@ -56,7 +56,7 @@ public class Values<V> extends PTransform<PCollection<? extends KV<?, V>>,
private Values() { }
@Override
- public PCollection<V> apply(PCollection<? extends KV<?, V>> in) {
+ public PCollection<V> expand(PCollection<? extends KV<?, V>> in) {
return
in.apply("Values", MapElements.via(new SimpleFunction<KV<?, V>, V>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 5fafc0a..126679d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -251,7 +251,7 @@ public class View {
}
@Override
- public PCollectionView<List<T>> apply(PCollection<T> input) {
+ public PCollectionView<List<T>> expand(PCollection<T> input) {
return input.apply(CreatePCollectionView.<T, List<T>>of(PCollectionViews.listView(
input.getPipeline(), input.getWindowingStrategy(), input.getCoder())));
}
@@ -277,7 +277,7 @@ public class View {
}
@Override
- public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
+ public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
return input.apply(CreatePCollectionView.<T, Iterable<T>>of(PCollectionViews.iterableView(
input.getPipeline(), input.getWindowingStrategy(), input.getCoder())));
}
@@ -334,7 +334,7 @@ public class View {
}
@Override
- public PCollectionView<T> apply(PCollection<T> input) {
+ public PCollectionView<T> expand(PCollection<T> input) {
return input.apply(CreatePCollectionView.<T, T>of(PCollectionViews.singletonView(
input.getPipeline(),
input.getWindowingStrategy(),
@@ -364,7 +364,7 @@ public class View {
}
@Override
- public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+ public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(
PCollectionViews.multimapView(
input.getPipeline(),
@@ -401,7 +401,7 @@ public class View {
}
@Override
- public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) {
+ public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, V>>of(
PCollectionViews.mapView(
input.getPipeline(),
@@ -439,7 +439,7 @@ public class View {
}
@Override
- public PCollectionView<ViewT> apply(PCollection<ElemT> input) {
+ public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
return view;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
index de28ecb..dd38006 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
@@ -111,7 +111,7 @@ public class WithKeys<K, V> extends PTransform<PCollection<V>,
}
@Override
- public PCollection<KV<K, V>> apply(PCollection<V> in) {
+ public PCollection<KV<K, V>> expand(PCollection<V> in) {
PCollection<KV<K, V>> result =
in.apply("AddKeys", MapElements.via(new SimpleFunction<V, KV<K, V>>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
index 64e7c45..387707b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
@@ -98,7 +98,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
}
@Override
- public PCollection<T> apply(PCollection<T> input) {
+ public PCollection<T> expand(PCollection<T> input) {
return input.apply(
"AddTimestamps", ParDo.of(new AddTimestampsDoFn<T>(fn, allowedTimestampSkew)));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
index b760e2c..1d10da4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
@@ -87,7 +87,7 @@ public class CoGroupByKey<K> extends
private CoGroupByKey() { }
@Override
- public PCollection<KV<K, CoGbkResult>> apply(
+ public PCollection<KV<K, CoGbkResult>> expand(
KeyedPCollectionTuple<K> input) {
if (input.isEmpty()) {
throw new IllegalArgumentException(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 5607762..0c430d0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -445,7 +445,7 @@ public class Window {
}
@Override
- public PCollection<T> apply(PCollection<T> input) {
+ public PCollection<T> expand(PCollection<T> input) {
WindowingStrategy<?, ?> outputStrategy =
getOutputStrategyInternal(input.getWindowingStrategy());
return PCollection.createPrimitiveOutputInternal(
@@ -517,7 +517,7 @@ public class Window {
*/
private static class Remerge<T> extends PTransform<PCollection<T>, PCollection<T>> {
@Override
- public PCollection<T> apply(PCollection<T> input) {
+ public PCollection<T> expand(PCollection<T> input) {
WindowingStrategy<?, ?> outputWindowingStrategy = getOutputWindowing(
input.getWindowingStrategy());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index ebd44bf..4d86c74 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -50,7 +50,7 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti
}
@Override
- public PCollection<KV<K, V>> apply(PCollection<KV<K, V>> input) {
+ public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
WindowingStrategy<?, ?> originalStrategy = input.getWindowingStrategy();
// If the input has already had its windows merged, then the GBK that performed the merge
// will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index 0c743f7..fea1554 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -212,7 +212,7 @@ public class PipelineTest {
private static class IdentityTransform<T extends PInput & POutput>
extends PTransform<T, T> {
@Override
- public T apply(T input) {
+ public T expand(T input) {
return input;
}
}
@@ -247,7 +247,7 @@ public class PipelineTest {
}
@Override
- public PCollection<T> apply(PCollectionTuple input) {
+ public PCollection<T> expand(PCollectionTuple input) {
return input.get(tag);
}
}
@@ -281,7 +281,7 @@ public class PipelineTest {
}
@Override
- public PCollectionTuple apply(PCollection<T> input) {
+ public PCollectionTuple expand(PCollection<T> input) {
return PCollectionTuple.of(tag, input);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index 530d755..d7badab 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -370,7 +370,7 @@ public class CoderRegistryTest {
@Override
public PCollection<KV<String, MySerializableGeneric<String>>>
- apply(PCollection<String> input) {
+ expand(PCollection<String> input) {
return input.apply(ParDo.of(new OutputDoFn()));
}
}
@@ -435,7 +435,7 @@ public class CoderRegistryTest {
@Override
public PCollection<KV<String, MySerializableGeneric<T>>>
- apply(PCollection<String> input) {
+ expand(PCollection<String> input) {
return input.apply(ParDo.of(new OutputDoFn()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 5be5ff1..5a7c994 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -123,7 +123,7 @@ public class WriteTest {
}
@Override
- public PCollection<T> apply(PCollection<T> input) {
+ public PCollection<T> expand(PCollection<T> input) {
return input
.apply(window)
.apply(ParDo.of(new AddArbitraryKey<T>()))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index ea43188..b0c17d8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -98,7 +98,7 @@ public class TransformHierarchyTest {
pcList,
new PTransform<PCollectionList<Long>, PCollection<Long>>() {
@Override
- public PCollection<Long> apply(PCollectionList<Long> input) {
+ public PCollection<Long> expand(PCollectionList<Long> input) {
return input.get(0);
}
});
@@ -130,7 +130,7 @@ public class TransformHierarchyTest {
pcList,
new PTransform<PCollectionList<Long>, PCollectionList<Long>>() {
@Override
- public PCollectionList<Long> apply(PCollectionList<Long> input) {
+ public PCollectionList<Long> expand(PCollectionList<Long> input) {
return appended;
}
});
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index a81fb1a..d70aa2f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -73,7 +73,7 @@ public class TransformTreeTest {
extends PTransform<PBegin, PCollectionList<String>> {
@Override
- public PCollectionList<String> apply(PBegin b) {
+ public PCollectionList<String> expand(PBegin b) {
// Composite transform: apply delegates to other transformations,
// here a Create transform.
PCollection<String> result = b.apply(Create.of("hello", "world"));
@@ -95,7 +95,7 @@ public class TransformTreeTest {
extends PTransform<PCollection<Integer>, PDone> {
@Override
- public PDone apply(PCollection<Integer> input) {
+ public PDone expand(PCollection<Integer> input) {
// Apply an operation so that this is a composite transform.
input.apply(Count.<Integer>perElement());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
index 311c8de..bb2877e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
@@ -181,7 +181,7 @@ public class FlatMapElementsTest implements Serializable {
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Void>>> {
@Override
- public PCollection<KV<K, Void>> apply(PCollection<KV<K, V>> input) {
+ public PCollection<KV<K, Void>> expand(PCollection<KV<K, V>> input) {
return input.apply(FlatMapElements.<KV<K, V>, KV<K, Void>>via(
new SimpleFunction<KV<K, V>, Iterable<KV<K, Void>>>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 535ffec..ebde110 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -306,7 +306,7 @@ public class GroupByKeyTest {
p.apply(
new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
@Override
- public PCollection<KV<String, Integer>> apply(PBegin input) {
+ public PCollection<KV<String, Integer>> expand(PBegin input) {
return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
input.getPipeline(),
WindowingStrategy.globalDefault(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index 4a34c57..ac3444b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -271,7 +271,7 @@ public class MapElementsTest implements Serializable {
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Void>>> {
@Override
- public PCollection<KV<K, Void>> apply(PCollection<KV<K, V>> input) {
+ public PCollection<KV<K, Void>> expand(PCollection<KV<K, V>> input) {
return input.apply(MapElements.via(
new SimpleFunction<KV<K, V>, KV<K, Void>>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PTransformTest.java
index bfe8225..2b7caee 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PTransformTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PTransformTest.java
@@ -36,7 +36,7 @@ public class PTransformTest {
PTransform<PCollection<String>, PCollection<String>> transform =
new PTransform<PCollection<String>, PCollection<String>>() {
@Override
- public PCollection<String> apply(PCollection<String> begin) {
+ public PCollection<String> expand(PCollection<String> begin) {
throw new IllegalArgumentException("Should never be applied");
}
};
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 9755076..41e795e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -272,7 +272,7 @@ public class ParDoTest implements Serializable {
private static final TupleTag<Integer> BY3 = new TupleTag<Integer>("by3"){};
@Override
- public PCollectionTuple apply(PCollection<Integer> input) {
+ public PCollectionTuple expand(PCollection<Integer> input) {
PCollection<Integer> by2 = input.apply("Filter2s", ParDo.of(new FilterFn(2)));
PCollection<Integer> by3 = input.apply("Filter3s", ParDo.of(new FilterFn(3)));
return PCollectionTuple.of(BY2, by2).and(BY3, by3);
@@ -840,7 +840,7 @@ public class ParDoTest implements Serializable {
.apply(Create.of(inputs))
.apply("CustomTransform", new PTransform<PCollection<Integer>, PCollection<String>>() {
@Override
- public PCollection<String> apply(PCollection<Integer> input) {
+ public PCollection<String> expand(PCollection<Integer> input) {
return input.apply(ParDo.of(new TestDoFn()));
}
});
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index fe2d125..3bf63fd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -1360,7 +1360,7 @@ public class ViewTest implements Serializable {
.apply(
new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
@Override
- public PCollection<KV<String, Integer>> apply(PBegin input) {
+ public PCollection<KV<String, Integer>> expand(PBegin input) {
return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
input.getPipeline(),
WindowingStrategy.globalDefault(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
index 7630779..feff333 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
@@ -46,7 +46,7 @@ public class DisplayDataEvaluatorTest implements Serializable {
PTransform<? super PCollection<String>, ? super POutput> myTransform =
new PTransform<PCollection<String>, POutput> () {
@Override
- public PCollection<String> apply(PCollection<String> input) {
+ public PCollection<String> expand(PCollection<String> input) {
return input.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
index f7f8d40..f892153 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
@@ -78,7 +78,7 @@ public class DisplayDataMatchersTest {
DisplayData data = DisplayData.from(new PTransform<PCollection<String>, PCollection<String>>() {
@Override
- public PCollection<String> apply(PCollection<String> input) {
+ public PCollection<String> expand(PCollection<String> input) {
throw new IllegalArgumentException("Should never be applied");
}
@@ -133,7 +133,7 @@ public class DisplayDataMatchersTest {
assertFalse(matcher.matches(DisplayData.from(
new PTransform<PCollection<String>, PCollection<String>>(){
@Override
- public PCollection<String> apply(PCollection<String> input) {
+ public PCollection<String> expand(PCollection<String> input) {
throw new IllegalArgumentException("Should never be applied");
}
})));
@@ -206,7 +206,7 @@ public class DisplayDataMatchersTest {
}
@Override
- public PCollection<String> apply(PCollection<String> input) {
+ public PCollection<String> expand(PCollection<String> input) {
throw new IllegalArgumentException("Should never be applied");
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
index fccd031..f5c1e73 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
@@ -111,7 +111,7 @@ public class DisplayDataTest implements Serializable {
Instant startTime = defaultStartTime;
@Override
- public PCollection<String> apply(PCollection<String> begin) {
+ public PCollection<String> expand(PCollection<String> begin) {
throw new IllegalArgumentException("Should never be applied");
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index ab208dd..d4fab17 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -70,7 +70,7 @@ public class WindowingTest implements Serializable {
this.windowFn = windowFn;
}
@Override
- public PCollection<String> apply(PCollection<String> in) {
+ public PCollection<String> expand(PCollection<String> in) {
return in.apply("Window",
Window.<String>into(windowFn)
.withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
index 042e9e3..1ac176b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
@@ -69,13 +69,13 @@ public class StringUtilsTest {
private class EmbeddedPTransform extends PTransform<PBegin, PDone> {
@Override
- public PDone apply(PBegin begin) {
+ public PDone expand(PBegin begin) {
throw new IllegalArgumentException("Should never be applied");
}
private class Bound extends PTransform<PBegin, PDone> {
@Override
- public PDone apply(PBegin begin) {
+ public PDone expand(PBegin begin) {
throw new IllegalArgumentException("Should never be applied");
}
}
@@ -129,7 +129,7 @@ public class StringUtilsTest {
AnonymousClass anonymousClassObj = new AnonymousClass() {
class NamedInnerClass extends PTransform<PBegin, PDone> {
@Override
- public PDone apply(PBegin begin) {
+ public PDone expand(PBegin begin) {
throw new IllegalArgumentException("Should never be applied");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
index 4000e5d..e5f2019 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
@@ -48,7 +48,7 @@ public class PDoneTest {
*/
static class EmptyTransform extends PTransform<PBegin, PDone> {
@Override
- public PDone apply(PBegin begin) {
+ public PDone expand(PBegin begin) {
return PDone.in(begin.getPipeline());
}
}
@@ -64,7 +64,7 @@ public class PDoneTest {
}
@Override
- public PDone apply(PBegin begin) {
+ public PDone expand(PBegin begin) {
return
begin
.apply(Create.of(LINES))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
index 3bd9afa..d1b4d07 100644
--- a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
@@ -74,7 +74,7 @@ public class SortValues<PrimaryKeyT, SecondaryKeyT, ValueT>
}
@Override
- public PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> apply(
+ public PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> expand(
PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> input) {
return input.apply(
ParDo.of(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 8bfbd53..f99ca78 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -631,7 +631,7 @@ public class BigQueryIO {
}
@Override
- public PCollection<TableRow> apply(PBegin input) {
+ public PCollection<TableRow> expand(PBegin input) {
String uuid = randomUUIDString();
final String jobIdToken = "beam_job_" + uuid;
@@ -840,7 +840,7 @@ public class BigQueryIO {
}
@Override
- public PCollection<T> apply(PCollection<T> input) {
+ public PCollection<T> expand(PCollection<T> input) {
TupleTag<T> mainOutput = new TupleTag<>();
TupleTag<Void> cleanupSignal = new TupleTag<>();
PCollectionTuple outputs = input.apply(ParDo.of(new IdentityFn<T>())
@@ -1846,7 +1846,7 @@ public class BigQueryIO {
}
@Override
- public PDone apply(PCollection<TableRow> input) {
+ public PDone expand(PCollection<TableRow> input) {
Pipeline p = input.getPipeline();
BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
BigQueryServices bqServices = getBigQueryServices();
@@ -2803,7 +2803,7 @@ public class BigQueryIO {
}
@Override
- public PDone apply(PCollection<TableRow> input) {
+ public PDone expand(PCollection<TableRow> input) {
// A naive implementation would be to simply stream data directly to BigQuery.
// However, this could occasionally lead to duplicated data, e.g., when
// a VM that runs this code is restarted and the code is re-run.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 1ee9253..a83784b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -286,7 +286,7 @@ public class BigtableIO {
}
@Override
- public PCollection<Row> apply(PBegin input) {
+ public PCollection<Row> expand(PBegin input) {
BigtableSource source =
new BigtableSource(new SerializableFunction<PipelineOptions, BigtableService>() {
@Override
@@ -500,7 +500,7 @@ public class BigtableIO {
}
@Override
- public PDone apply(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
+ public PDone expand(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
input.apply(ParDo.of(new BigtableWriterFn(tableId,
new SerializableFunction<PipelineOptions, BigtableService>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index d1a9a67..1e8271c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -411,7 +411,7 @@ public class DatastoreV1 {
}
@Override
- public PCollection<Entity> apply(PBegin input) {
+ public PCollection<Entity> expand(PBegin input) {
V1Options v1Options = V1Options.from(getProjectId(), getQuery(),
getNamespace());
@@ -779,7 +779,7 @@ public class DatastoreV1 {
}
@Override
- public PDone apply(PCollection<T> input) {
+ public PDone expand(PCollection<T> input) {
input.apply("Convert to Mutation", MapElements.via(mutationFn))
.apply("Write Mutation to Datastore", ParDo.of(new DatastoreWriterFn(projectId)));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 9644a65..c625287 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -296,7 +296,7 @@ public class JdbcIO {
}
@Override
- public PCollection<T> apply(PBegin input) {
+ public PCollection<T> expand(PBegin input) {
return input
.apply(Create.of(getQuery()))
.apply(ParDo.of(new ReadFn<>(this))).setCoder(getCoder())
@@ -417,7 +417,7 @@ public class JdbcIO {
}
@Override
- public PDone apply(PCollection<T> input) {
+ public PDone expand(PCollection<T> input) {
input.apply(ParDo.of(new WriteFn<T>(this)));
return PDone.in(input.getPipeline());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index 00b91ad..24fa67d 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -135,7 +135,7 @@ public class JmsIO {
}
@Override
- public PCollection<JmsRecord> apply(PBegin input) {
+ public PCollection<JmsRecord> expand(PBegin input) {
// handles unbounded source to bounded conversion if maxNumRecords is set.
Unbounded<JmsRecord> unbounded = org.apache.beam.sdk.io.Read.from(createSource());
@@ -434,7 +434,7 @@ public class JmsIO {
}
@Override
- public PDone apply(PCollection<String> input) {
+ public PDone expand(PCollection<String> input) {
input.apply(ParDo.of(new JmsWriter(connectionFactory, queue, topic)));
return PDone.in(input.getPipeline());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index c87d12b..735b8e7 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -445,7 +445,7 @@ public class KafkaIO {
}
@Override
- public PCollection<KafkaRecord<K, V>> apply(PBegin input) {
+ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
// Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
Unbounded<KafkaRecord<K, V>> unbounded =
org.apache.beam.sdk.io.Read.from(makeSource());
@@ -544,9 +544,9 @@ public class KafkaIO {
}
@Override
- public PCollection<KV<K, V>> apply(PBegin begin) {
+ public PCollection<KV<K, V>> expand(PBegin begin) {
return typedRead
- .apply(begin)
+ .expand(begin)
.apply("Remove Kafka Metadata",
ParDo.of(new DoFn<KafkaRecord<K, V>, KV<K, V>>() {
@ProcessElement
@@ -1244,7 +1244,7 @@ public class KafkaIO {
}
@Override
- public PDone apply(PCollection<KV<K, V>> input) {
+ public PDone expand(PCollection<KV<K, V>> input) {
input.apply(ParDo.of(new KafkaWriter<K, V>(
topic, keyCoder, valueCoder, producerConfig, producerFactoryFnOpt)));
return PDone.in(input.getPipeline());
@@ -1311,7 +1311,7 @@ public class KafkaIO {
}
@Override
- public PDone apply(PCollection<V> input) {
+ public PDone expand(PCollection<V> input) {
return input
.apply("Kafka values with default key",
MapElements.via(new SimpleFunction<V, KV<Void, V>>() {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
index a482df4..b65e671 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
@@ -308,7 +308,7 @@ public class MongoDbGridFSIO {
}
@Override
- public PCollection<T> apply(PBegin input) {
+ public PCollection<T> expand(PBegin input) {
final BoundedGridFSSource source = new BoundedGridFSSource(this, null);
org.apache.beam.sdk.io.Read.Bounded<ObjectId> objectIds =
org.apache.beam.sdk.io.Read.from(source);
@@ -621,7 +621,7 @@ public class MongoDbGridFSIO {
}
@Override
- public PDone apply(PCollection<T> input) {
+ public PDone expand(PCollection<T> input) {
input.apply(ParDo.of(new GridFsWriteFn<T>(this)));
return PDone.in(input.getPipeline());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d607b5a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 70239e6..f539431 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -160,7 +160,7 @@ public class MongoDbIO {
}
@Override
- public PCollection<Document> apply(PBegin input) {
+ public PCollection<Document> expand(PBegin input) {
return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedMongoDbSource(this)));
}
@@ -441,7 +441,7 @@ public class MongoDbIO {
}
@Override
- public PDone apply(PCollection<Document> input) {
+ public PDone expand(PCollection<Document> input) {
input.apply(ParDo.of(new WriteFn(this)));
return PDone.in(input.getPipeline());
}