You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/04 01:26:17 UTC
[09/19] incubator-beam git commit: Rename DoFn to OldDoFn
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
index 5b9eeff..5e96c46 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
@@ -145,9 +145,9 @@ public class SerializationTest {
}
/**
- * A DoFn that tokenizes lines of text into individual words.
+ * A OldDoFn that tokenizes lines of text into individual words.
*/
- static class ExtractWordsFn extends DoFn<StringHolder, StringHolder> {
+ static class ExtractWordsFn extends OldDoFn<StringHolder, StringHolder> {
private static final Pattern WORD_BOUNDARY = Pattern.compile("[^a-zA-Z']+");
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
@@ -173,9 +173,9 @@ public class SerializationTest {
}
/**
- * A DoFn that converts a Word and Count into a printable string.
+ * A OldDoFn that converts a Word and Count into a printable string.
*/
- private static class FormatCountsFn extends DoFn<KV<StringHolder, Long>, StringHolder> {
+ private static class FormatCountsFn extends OldDoFn<KV<StringHolder, Long>, StringHolder> {
@Override
public void processElement(ProcessContext c) {
c.output(new StringHolder(c.element().getKey() + ": " + c.element().getValue()));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
index 60b7f71..5775565 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringDelegateCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.junit.After;
@@ -54,7 +54,7 @@ public class SideEffectsTest implements Serializable {
pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
- pipeline.apply(Create.of("a")).apply(ParDo.of(new DoFn<String, String>() {
+ pipeline.apply(Create.of("a")).apply(ParDo.of(new OldDoFn<String, String>() {
@Override
public void processElement(ProcessContext c) throws Exception {
throw new UserException();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index 904b448..c005f14 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -122,7 +122,7 @@ public class KafkaStreamingTest {
EMBEDDED_ZOOKEEPER.shutdown();
}
- private static class FormatKVFn extends DoFn<KV<String, String>, String> {
+ private static class FormatKVFn extends OldDoFn<KV<String, String>, String> {
@Override
public void processElement(ProcessContext c) {
c.output(c.element().getKey() + "," + c.element().getValue());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 873a591..da4db93 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.values.TypeDescriptor;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
index 9db6650..c34ce66 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.coders;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import com.fasterxml.jackson.annotation.JsonCreator;
-
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
index 693791c..d41bd1f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
@@ -22,7 +22,6 @@ import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import com.google.common.base.Converter;
import com.fasterxml.jackson.annotation.JsonCreator;
-
import org.joda.time.Instant;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index ecb1f0a..182fa1f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -25,7 +25,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PubsubOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -709,7 +709,7 @@ public class PubsubIO {
*
* <p>Public so can be suppressed by runners.
*/
- public class PubsubBoundedReader extends DoFn<Void, T> {
+ public class PubsubBoundedReader extends OldDoFn<Void, T> {
private static final int DEFAULT_PULL_SIZE = 100;
private static final int ACK_TIMEOUT_SEC = 60;
@@ -998,7 +998,7 @@ public class PubsubIO {
*
* <p>Public so can be suppressed by runners.
*/
- public class PubsubBoundedWriter extends DoFn<T, Void> {
+ public class PubsubBoundedWriter extends OldDoFn<T, Void> {
private static final int MAX_PUBLISH_BATCH_SIZE = 100;
private transient List<OutgoingMessage> output;
private transient PubsubClient pubsubClient;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index 6f2b3ac..9e9536d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -31,8 +31,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PubsubOptions;
import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
@@ -78,7 +78,7 @@ import javax.annotation.Nullable;
* <li>We try to send messages in batches while also limiting send latency.
* <li>No stats are logged. Rather some counters are used to keep track of elements and batches.
* <li>Though some background threads are used by the underlying netty system all actual Pubsub
- * calls are blocking. We rely on the underlying runner to allow multiple {@link DoFn} instances
+ * calls are blocking. We rely on the underlying runner to allow multiple {@link OldDoFn} instances
* to execute concurrently and hide latency.
* <li>A failed bundle will cause messages to be resent. Thus we rely on the Pubsub consumer
* to dedup messages.
@@ -155,7 +155,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
/**
* Convert elements to messages and shard them.
*/
- private static class ShardFn<T> extends DoFn<T, KV<Integer, OutgoingMessage>> {
+ private static class ShardFn<T> extends OldDoFn<T, KV<Integer, OutgoingMessage>> {
private final Aggregator<Long, Long> elementCounter =
createAggregator("elements", new Sum.SumLongFn());
private final Coder<T> elementCoder;
@@ -207,7 +207,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
* Publish messages to Pubsub in batches.
*/
private static class WriterFn
- extends DoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> {
+ extends OldDoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> {
private final PubsubClientFactory pubsubFactory;
private final TopicPath topic;
private final String timestampLabel;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index 07d355e..d98bd6a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -31,7 +31,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PubsubOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
@@ -1107,7 +1107,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
// StatsFn
// ================================================================================
- private static class StatsFn<T> extends DoFn<T, T> {
+ private static class StatsFn<T> extends OldDoFn<T, T> {
private final Aggregator<Long, Long> elementCounter =
createAggregator("elements", new Sum.SumLongFn());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
index b8902f9..de00035 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
@@ -19,9 +19,9 @@ package org.apache.beam.sdk.io;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
-
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
+
import org.joda.time.Instant;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index 42d3c05..3e997b0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -27,8 +27,8 @@ import org.apache.beam.sdk.io.Sink.WriteOperation;
import org.apache.beam.sdk.io.Sink.Writer;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -156,7 +156,7 @@ public class Write {
* Writes all the elements in a bundle using a {@link Writer} produced by the
* {@link WriteOperation} associated with the {@link Sink}.
*/
- private class WriteBundles<WriteT> extends DoFn<T, WriteT> {
+ private class WriteBundles<WriteT> extends OldDoFn<T, WriteT> {
// Writer that will write the records in this bundle. Lazily
// initialized in processElement.
private Writer<T, WriteT> writer = null;
@@ -182,7 +182,7 @@ public class Write {
// Discard write result and close the write.
try {
writer.close();
- // The writer does not need to be reset, as this DoFn cannot be reused.
+ // The writer does not need to be reset, as this OldDoFn cannot be reused.
} catch (Exception closeException) {
if (closeException instanceof InterruptedException) {
// Do not silently ignore interrupted state.
@@ -217,7 +217,7 @@ public class Write {
*
* @see WriteBundles
*/
- private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> {
+ private class WriteShardedBundles<WriteT> extends OldDoFn<KV<Integer, Iterable<T>>, WriteT> {
private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
@@ -296,10 +296,11 @@ public class Write {
* <p>This singleton collection containing the WriteOperation is then used as a side input to a
* ParDo over the PCollection of elements to write. In this bundle-writing phase,
* {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
- * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and
- * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for every
- * element in the bundle. The output of this ParDo is a PCollection of <i>writer result</i>
- * objects (see {@link Sink} for a description of writer results)-one for each bundle.
+ * {@link Writer#open} and {@link Writer#close} are called in {@link OldDoFn#startBundle} and
+ * {@link OldDoFn#finishBundle}, respectively, and {@link Writer#write} method is called for
+ * every element in the bundle. The output of this ParDo is a PCollection of
+ * <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for
+ * each bundle.
*
* <p>The final do-once ParDo uses the singleton collection of the WriteOperation as input and
* the collection of writer results as a side-input. In this ParDo,
@@ -333,7 +334,7 @@ public class Write {
// Initialize the resource in a do-once ParDo on the WriteOperation.
operationCollection = operationCollection
.apply("Initialize", ParDo.of(
- new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
+ new OldDoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
WriteOperation<T, WriteT> writeOperation = c.element();
@@ -387,7 +388,7 @@ public class Write {
// ParDo. There is a dependency between this ParDo and the parallel write (the writer results
// collection as a side input), so it will happen after the parallel write.
operationCollection
- .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
+ .apply("Finalize", ParDo.of(new OldDoFn<WriteOperation<T, WriteT>, Integer>() {
@Override
public void processElement(ProcessContext c) throws Exception {
WriteOperation<T, WriteT> writeOperation = c.element();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
index e0a1ef3..b2df96e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
@@ -29,7 +29,6 @@ import com.google.common.base.Strings;
import com.google.common.io.Files;
import com.fasterxml.jackson.annotation.JsonIgnore;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index e89e5ad..aa9f13e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -22,8 +22,8 @@ import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer;
import org.apache.beam.sdk.options.ProxyInvocationHandler.Deserializer;
import org.apache.beam.sdk.options.ProxyInvocationHandler.Serializer;
import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Context;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.Context;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import com.google.auto.service.AutoService;
@@ -52,7 +52,7 @@ import javax.annotation.concurrent.ThreadSafe;
* and {@link PipelineOptionsFactory#as(Class)}. They can be created
* from command-line arguments with {@link PipelineOptionsFactory#fromArgs(String[])}.
* They can be converted to another type by invoking {@link PipelineOptions#as(Class)} and
- * can be accessed from within a {@link DoFn} by invoking
+ * can be accessed from within a {@link OldDoFn} by invoking
* {@link Context#getPipelineOptions()}.
*
* <p>For example:
@@ -151,7 +151,7 @@ import javax.annotation.concurrent.ThreadSafe;
* {@link PipelineOptionsFactory#withValidation()} is invoked.
*
* <p>{@link JsonIgnore @JsonIgnore} is used to prevent a property from being serialized and
- * available during execution of {@link DoFn}. See the Serialization section below for more
+ * available during execution of {@link OldDoFn}. See the Serialization section below for more
* details.
*
* <h2>Registration Of PipelineOptions</h2>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
index f21b9b9..67fa2af 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
@@ -53,7 +53,6 @@ import com.google.common.collect.TreeMultimap;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsReflector.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsReflector.java
index 815de82..607bdda 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsReflector.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsReflector.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.options;
import org.apache.beam.sdk.util.common.ReflectHelpers;
+
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorValues.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorValues.java
index a42ece2..6f6836e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorValues.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorValues.java
@@ -19,14 +19,14 @@ package org.apache.beam.sdk.runners;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import java.util.Collection;
import java.util.Map;
/**
* A collection of values associated with an {@link Aggregator}. Aggregators declared in a
- * {@link DoFn} are emitted on a per-{@code DoFn}-application basis.
+ * {@link OldDoFn} are emitted on a per-{@code OldDoFn}-application basis.
*
* @param <T> the output type of the aggregator
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index a202ed4..80340c2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -33,11 +33,11 @@ import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -762,7 +762,7 @@ public class PAssert {
.apply("RewindowActuals", rewindowActuals.<T>windowActuals())
.apply(
ParDo.of(
- new DoFn<T, T>() {
+ new OldDoFn<T, T>() {
@Override
public void processElement(ProcessContext context) throws CoderException {
context.output(CoderUtils.clone(coder, context.element()));
@@ -884,7 +884,7 @@ public class PAssert {
}
}
- private static final class ConcatFn<T> extends DoFn<Iterable<Iterable<T>>, Iterable<T>> {
+ private static final class ConcatFn<T> extends OldDoFn<Iterable<Iterable<T>>, Iterable<T>> {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(Iterables.concat(c.element()));
@@ -995,13 +995,13 @@ public class PAssert {
}
/**
- * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of a
+ * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of a
* {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing.
*
* <p>The input is ignored, but is {@link Integer} to be usable on runners that do not support
* null values.
*/
- private static class SideInputCheckerDoFn<ActualT> extends DoFn<Integer, Void> {
+ private static class SideInputCheckerDoFn<ActualT> extends OldDoFn<Integer, Void> {
private final SerializableFunction<ActualT, Void> checkerFn;
private final Aggregator<Integer, Integer> success =
createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -1030,13 +1030,13 @@ public class PAssert {
}
/**
- * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
+ * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of
* the single iterable element of the input {@link PCollection} and adjusts counters and
* thrown exceptions for use in testing.
*
* <p>The singleton property is presumed, not enforced.
*/
- private static class GroupedValuesCheckerDoFn<ActualT> extends DoFn<ActualT, Void> {
+ private static class GroupedValuesCheckerDoFn<ActualT> extends OldDoFn<ActualT, Void> {
private final SerializableFunction<ActualT, Void> checkerFn;
private final Aggregator<Integer, Integer> success =
createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -1061,14 +1061,14 @@ public class PAssert {
}
/**
- * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
+ * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of
* the single item contained within the single iterable on input and
* adjusts counters and thrown exceptions for use in testing.
*
* <p>The singleton property of the input {@link PCollection} is presumed, not enforced. However,
* each input element must be a singleton iterable, or this will fail.
*/
- private static class SingletonCheckerDoFn<ActualT> extends DoFn<Iterable<ActualT>, Void> {
+ private static class SingletonCheckerDoFn<ActualT> extends OldDoFn<Iterable<ActualT>, Void> {
private final SerializableFunction<ActualT, Void> checkerFn;
private final Aggregator<Integer, Integer> success =
createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -1310,7 +1310,7 @@ public class PAssert {
}
/**
- * A DoFn that filters elements based on their presence in a static collection of windows.
+ * A OldDoFn that filters elements based on their presence in a static collection of windows.
*/
private static final class FilterWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
private final StaticWindows windows;
@@ -1324,7 +1324,7 @@ public class PAssert {
return input.apply("FilterWindows", ParDo.of(new Fn()));
}
- private class Fn extends DoFn<T, T> implements RequiresWindowAccess {
+ private class Fn extends OldDoFn<T, T> implements RequiresWindowAccess {
@Override
public void processElement(ProcessContext c) throws Exception {
if (windows.getWindows().contains(c.window())) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java
index 45b0592..4e0c0be 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java
@@ -35,7 +35,6 @@ import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
-
import javax.annotation.Nullable;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 0de3024..98cdeba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -37,7 +37,6 @@ import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-
import org.junit.experimental.categories.Category;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
index ff553ba..c4596c1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
@@ -21,6 +21,7 @@ import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.PipelineOptions;
+
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
index c8aad78..db4ab33 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
@@ -24,8 +24,9 @@ import org.apache.beam.sdk.util.ExecutionContext;
* An {@code Aggregator<InputT>} enables monitoring of values of type {@code InputT},
* to be combined across all bundles.
*
- * <p>Aggregators are created by calling {@link DoFn#createAggregator DoFn.createAggregatorForDoFn},
- * typically from the {@link DoFn} constructor. Elements can be added to the
+ * <p>Aggregators are created by calling
+ * {@link OldDoFn#createAggregator OldDoFn.createAggregatorForDoFn},
+ * typically from the {@link OldDoFn} constructor. Elements can be added to the
* {@code Aggregator} by calling {@link Aggregator#addValue}.
*
* <p>Aggregators are visible in the monitoring UI, when the pipeline is run
@@ -36,7 +37,7 @@ import org.apache.beam.sdk.util.ExecutionContext;
*
* <p>Example:
* <pre> {@code
- * class MyDoFn extends DoFn<String, String> {
+ * class MyDoFn extends OldDoFn<String, String> {
* private Aggregator<Integer, Integer> myAggregator;
*
* public MyDoFn() {
@@ -78,8 +79,9 @@ public interface Aggregator<InputT, OutputT> {
/**
* Create an aggregator with the given {@code name} and {@link CombineFn}.
*
- * <p>This method is called to create an aggregator for a {@link DoFn}. It receives the class
- * of the {@link DoFn} being executed and the context of the step it is being executed in.
+ * <p>This method is called to create an aggregator for a {@link OldDoFn}. It receives the
+ * class of the {@link OldDoFn} being executed and the context of the step it is being
+ * executed in.
*/
<InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
Class<?> fnClass, ExecutionContext.StepContext stepContext,
@@ -88,7 +90,7 @@ public interface Aggregator<InputT, OutputT> {
// TODO: Consider the following additional API conveniences:
// - In addition to createAggregatorForDoFn(), consider adding getAggregator() to
- // avoid the need to store the aggregator locally in a DoFn, i.e., create
+ // avoid the need to store the aggregator locally in a OldDoFn, i.e., create
// if not already present.
// - Add a shortcut for the most common aggregator:
// c.createAggregatorForDoFn("name", new Sum.SumIntegerFn()).
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
index 97961e9..abed843 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.transforms;
import java.util.Collection;
/**
- * An internal class for extracting {@link Aggregator Aggregators} from {@link DoFn DoFns}.
+ * An internal class for extracting {@link Aggregator Aggregators} from {@link OldDoFn DoFns}.
*/
public final class AggregatorRetriever {
private AggregatorRetriever() {
@@ -28,9 +28,9 @@ public final class AggregatorRetriever {
}
/**
- * Returns the {@link Aggregator Aggregators} created by the provided {@link DoFn}.
+ * Returns the {@link Aggregator Aggregators} created by the provided {@link OldDoFn}.
*/
- public static Collection<Aggregator<?, ?>> getAggregators(DoFn<?, ?> fn) {
+ public static Collection<Aggregator<?, ?>> getAggregators(OldDoFn<?, ?> fn) {
return fn.getAggregators();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 96c03eb..6fc2324 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -1473,9 +1473,9 @@ public class Combine {
PCollection<OutputT> defaultIfEmpty = maybeEmpty.getPipeline()
.apply("CreateVoid", Create.of((Void) null).withCoder(VoidCoder.of()))
.apply("ProduceDefault", ParDo.withSideInputs(maybeEmptyView).of(
- new DoFn<Void, OutputT>() {
+ new OldDoFn<Void, OutputT>() {
@Override
- public void processElement(DoFn<Void, OutputT>.ProcessContext c) {
+ public void processElement(OldDoFn<Void, OutputT>.ProcessContext c) {
Iterator<OutputT> combined = c.sideInput(maybeEmptyView).iterator();
if (!combined.hasNext()) {
c.output(defaultValue);
@@ -2097,7 +2097,7 @@ public class Combine {
final TupleTag<KV<KV<K, Integer>, InputT>> hot = new TupleTag<>();
final TupleTag<KV<K, InputT>> cold = new TupleTag<>();
PCollectionTuple split = input.apply("AddNonce", ParDo.of(
- new DoFn<KV<K, InputT>, KV<K, InputT>>() {
+ new OldDoFn<KV<K, InputT>, KV<K, InputT>>() {
transient int counter;
@Override
public void startBundle(Context c) {
@@ -2135,8 +2135,8 @@ public class Combine {
.setWindowingStrategyInternal(preCombineStrategy)
.apply("PreCombineHot", Combine.perKey(hotPreCombine))
.apply("StripNonce", ParDo.of(
- new DoFn<KV<KV<K, Integer>, AccumT>,
- KV<K, InputOrAccum<InputT, AccumT>>>() {
+ new OldDoFn<KV<KV<K, Integer>, AccumT>,
+ KV<K, InputOrAccum<InputT, AccumT>>>() {
@Override
public void processElement(ProcessContext c) {
c.output(KV.of(
@@ -2151,7 +2151,7 @@ public class Combine {
.get(cold)
.setCoder(inputCoder)
.apply("PrepareCold", ParDo.of(
- new DoFn<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>() {
+ new OldDoFn<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>() {
@Override
public void processElement(ProcessContext c) {
c.output(KV.of(c.element().getKey(),
@@ -2359,7 +2359,7 @@ public class Combine {
final PerKeyCombineFnRunner<? super K, ? super InputT, ?, OutputT> combineFnRunner =
PerKeyCombineFnRunners.create(fn);
PCollection<KV<K, OutputT>> output = input.apply(ParDo.of(
- new DoFn<KV<K, ? extends Iterable<InputT>>, KV<K, OutputT>>() {
+ new OldDoFn<KV<K, ? extends Iterable<InputT>>, KV<K, OutputT>>() {
@Override
public void processElement(ProcessContext c) {
K key = c.element().getKey();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index f2ed5e1..777deba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -90,7 +90,7 @@ public class CombineFns {
*
* PCollection<T> finalResultCollection = maxAndMean
* .apply(ParDo.of(
- * new DoFn<KV<K, CoCombineResult>, T>() {
+ * new OldDoFn<KV<K, CoCombineResult>, T>() {
* @Override
* public void processElement(ProcessContext c) throws Exception {
* KV<K, CoCombineResult> e = c.element();
@@ -133,7 +133,7 @@ public class CombineFns {
*
* PCollection<T> finalResultCollection = maxAndMean
* .apply(ParDo.of(
- * new DoFn<CoCombineResult, T>() {
+ * new OldDoFn<CoCombineResult, T>() {
* @Override
* public void processElement(ProcessContext c) throws Exception {
* CoCombineResult e = c.element();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index 3a0fb5d..7601ffc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -107,7 +107,7 @@ public class Count {
public PCollection<KV<T, Long>> apply(PCollection<T> input) {
return
input
- .apply("Init", ParDo.of(new DoFn<T, KV<T, Void>>() {
+ .apply("Init", ParDo.of(new OldDoFn<T, KV<T, Void>>() {
@Override
public void processElement(ProcessContext c) {
c.output(KV.of(c.element(), (Void) null));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index fa645ab..fb7f784 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -486,7 +486,7 @@ public class Create<T> {
this.elementCoder = elementCoder;
}
- private static class ConvertTimestamps<T> extends DoFn<TimestampedValue<T>, T> {
+ private static class ConvertTimestamps<T> extends OldDoFn<TimestampedValue<T>, T> {
@Override
public void processElement(ProcessContext c) {
c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
deleted file mode 100644
index 6d5d1ed..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ /dev/null
@@ -1,565 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-import com.google.common.base.MoreObjects;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.UUID;
-
-/**
- * The argument to {@link ParDo} providing the code to use to process
- * elements of the input
- * {@link org.apache.beam.sdk.values.PCollection}.
- *
- * <p>See {@link ParDo} for more explanation, examples of use, and
- * discussion of constraints on {@code DoFn}s, including their
- * serializability, lack of access to global shared mutable state,
- * requirements for failure tolerance, and benefits of optimization.
- *
- * <p>{@code DoFn}s can be tested in the context of a particular
- * {@code Pipeline} by running that {@code Pipeline} on sample input
- * and then checking its output. Unit testing of a {@code DoFn},
- * separately from any {@code ParDo} transform or {@code Pipeline},
- * can be done via the {@link DoFnTester} harness.
- *
- * <p>{@link DoFnWithContext} (currently experimental) offers an alternative
- * mechanism for accessing {@link ProcessContext#window()} without the need
- * to implement {@link RequiresWindowAccess}.
- *
- * <p>See also {@link #processElement} for details on implementing the transformation
- * from {@code InputT} to {@code OutputT}.
- *
- * @param <InputT> the type of the (main) input elements
- * @param <OutputT> the type of the (main) output elements
- */
-public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayData {
-
- /**
- * Information accessible to all methods in this {@code DoFn}.
- * Used primarily to output elements.
- */
- public abstract class Context {
-
- /**
- * Returns the {@code PipelineOptions} specified with the
- * {@link org.apache.beam.sdk.runners.PipelineRunner}
- * invoking this {@code DoFn}. The {@code PipelineOptions} will
- * be the default running via {@link DoFnTester}.
- */
- public abstract PipelineOptions getPipelineOptions();
-
- /**
- * Adds the given element to the main output {@code PCollection}.
- *
- * <p>Once passed to {@code output} the element should be considered
- * immutable and not be modified in any way. It may be cached or retained
- * by the Dataflow runtime or later steps in the pipeline, or used in
- * other unspecified ways.
- *
- * <p>If invoked from {@link DoFn#processElement processElement}, the output
- * element will have the same timestamp and be in the same windows
- * as the input element passed to {@link DoFn#processElement processElement}.
- *
- * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
- * this will attempt to use the
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element. The output element
- * will have a timestamp of negative infinity.
- */
- public abstract void output(OutputT output);
-
- /**
- * Adds the given element to the main output {@code PCollection},
- * with the given timestamp.
- *
- * <p>Once passed to {@code outputWithTimestamp} the element should not be
- * modified in any way.
- *
- * <p>If invoked from {@link DoFn#processElement processElement}, the timestamp
- * must not be older than the input element's timestamp minus
- * {@link DoFn#getAllowedTimestampSkew getAllowedTimestampSkew}. The output element will
- * be in the same windows as the input element.
- *
- * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
- * this will attempt to use the
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element except for the
- * timestamp.
- */
- public abstract void outputWithTimestamp(OutputT output, Instant timestamp);
-
- /**
- * Adds the given element to the side output {@code PCollection} with the
- * given tag.
- *
- * <p>Once passed to {@code sideOutput} the element should not be modified
- * in any way.
- *
- * <p>The caller of {@code ParDo} uses {@link ParDo#withOutputTags withOutputTags} to
- * specify the tags of side outputs that it consumes. Non-consumed side
- * outputs, e.g., outputs for monitoring purposes only, don't necessarily
- * need to be specified.
- *
- * <p>The output element will have the same timestamp and be in the same
- * windows as the input element passed to {@link DoFn#processElement processElement}.
- *
- * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
- * this will attempt to use the
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element. The output element
- * will have a timestamp of negative infinity.
- *
- * @see ParDo#withOutputTags
- */
- public abstract <T> void sideOutput(TupleTag<T> tag, T output);
-
- /**
- * Adds the given element to the specified side output {@code PCollection},
- * with the given timestamp.
- *
- * <p>Once passed to {@code sideOutputWithTimestamp} the element should not be
- * modified in any way.
- *
- * <p>If invoked from {@link DoFn#processElement processElement}, the timestamp
- * must not be older than the input element's timestamp minus
- * {@link DoFn#getAllowedTimestampSkew getAllowedTimestampSkew}. The output element will
- * be in the same windows as the input element.
- *
- * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
- * this will attempt to use the
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element except for the
- * timestamp.
- *
- * @see ParDo#withOutputTags
- */
- public abstract <T> void sideOutputWithTimestamp(
- TupleTag<T> tag, T output, Instant timestamp);
-
- /**
- * Creates an {@link Aggregator} in the {@link DoFn} context with the
- * specified name and aggregation logic specified by {@link CombineFn}.
- *
- * <p>For internal use only.
- *
- * @param name the name of the aggregator
- * @param combiner the {@link CombineFn} to use in the aggregator
- * @return an aggregator for the provided name and {@link CombineFn} in this
- * context
- */
- @Experimental(Kind.AGGREGATOR)
- protected abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
- createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
-
- /**
- * Sets up {@link Aggregator}s created by the {@link DoFn} so they are
- * usable within this context.
- *
- * <p>This method should be called by runners before {@link DoFn#startBundle}
- * is executed.
- */
- @Experimental(Kind.AGGREGATOR)
- protected final void setupDelegateAggregators() {
- for (DelegatingAggregator<?, ?> aggregator : aggregators.values()) {
- setupDelegateAggregator(aggregator);
- }
-
- aggregatorsAreFinal = true;
- }
-
- private final <AggInputT, AggOutputT> void setupDelegateAggregator(
- DelegatingAggregator<AggInputT, AggOutputT> aggregator) {
-
- Aggregator<AggInputT, AggOutputT> delegate = createAggregatorInternal(
- aggregator.getName(), aggregator.getCombineFn());
-
- aggregator.setDelegate(delegate);
- }
- }
-
- /**
- * Information accessible when running {@link DoFn#processElement}.
- */
- public abstract class ProcessContext extends Context {
-
- /**
- * Returns the input element to be processed.
- *
- * <p>The element should be considered immutable. The Dataflow runtime will not mutate the
- * element, so it is safe to cache, etc. The element should not be mutated by any of the
- * {@link DoFn} methods, because it may be cached elsewhere, retained by the Dataflow runtime,
- * or used in other unspecified ways.
- */
- public abstract InputT element();
-
- /**
- * Returns the value of the side input for the window corresponding to the
- * window of the main input element.
- *
- * <p>See
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn#getSideInputWindow}
- * for how this corresponding window is determined.
- *
- * @throws IllegalArgumentException if this is not a side input
- * @see ParDo#withSideInputs
- */
- public abstract <T> T sideInput(PCollectionView<T> view);
-
- /**
- * Returns the timestamp of the input element.
- *
- * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
- * for more information.
- */
- public abstract Instant timestamp();
-
- /**
- * Returns the window into which the input element has been assigned.
- *
- * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
- * for more information.
- *
- * @throws UnsupportedOperationException if this {@link DoFn} does
- * not implement {@link RequiresWindowAccess}.
- */
- public abstract BoundedWindow window();
-
- /**
- * Returns information about the pane within this window into which the
- * input element has been assigned.
- *
- * <p>Generally all data is in a single, uninteresting pane unless custom
- * triggering and/or late data has been explicitly requested.
- * See {@link org.apache.beam.sdk.transforms.windowing.Window}
- * for more information.
- */
- public abstract PaneInfo pane();
-
- /**
- * Returns the process context to use for implementing windowing.
- */
- @Experimental
- public abstract WindowingInternals<InputT, OutputT> windowingInternals();
- }
-
- /**
- * Returns the allowed timestamp skew duration, which is the maximum
- * duration that timestamps can be shifted backward in
- * {@link DoFn.Context#outputWithTimestamp}.
- *
- * <p>The default value is {@code Duration.ZERO}, in which case
- * timestamps can only be shifted forward to future. For infinite
- * skew, return {@code Duration.millis(Long.MAX_VALUE)}.
- *
- * <p> Note that producing an element whose timestamp is less than the
- * current timestamp may result in late data, i.e. returning a non-zero
- * value here does not impact watermark calculations used for firing
- * windows.
- *
- * @deprecated does not interact well with the watermark.
- */
- @Deprecated
- public Duration getAllowedTimestampSkew() {
- return Duration.ZERO;
- }
-
- /**
- * Interface for signaling that a {@link DoFn} needs to access the window the
- * element is being processed in, via {@link DoFn.ProcessContext#window}.
- */
- @Experimental
- public interface RequiresWindowAccess {}
-
- public DoFn() {
- this(new HashMap<String, DelegatingAggregator<?, ?>>());
- }
-
- DoFn(Map<String, DelegatingAggregator<?, ?>> aggregators) {
- this.aggregators = aggregators;
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- private final Map<String, DelegatingAggregator<?, ?>> aggregators;
-
- /**
- * Protects aggregators from being created after initialization.
- */
- private boolean aggregatorsAreFinal;
-
- /**
- * Prepares this {@code DoFn} instance for processing a batch of elements.
- *
- * <p>By default, does nothing.
- */
- public void startBundle(Context c) throws Exception {
- }
-
- /**
- * Processes one input element.
- *
- * <p>The current element of the input {@code PCollection} is returned by
- * {@link ProcessContext#element() c.element()}. It should be considered immutable. The Dataflow
- * runtime will not mutate the element, so it is safe to cache, etc. The element should not be
- * mutated by any of the {@link DoFn} methods, because it may be cached elsewhere, retained by the
- * Dataflow runtime, or used in other unspecified ways.
- *
- * <p>A value is added to the main output {@code PCollection} by {@link ProcessContext#output}.
- * Once passed to {@code output} the element should be considered immutable and not be modified in
- * any way. It may be cached elsewhere, retained by the Dataflow runtime, or used in other
- * unspecified ways.
- *
- * @see ProcessContext
- */
- public abstract void processElement(ProcessContext c) throws Exception;
-
- /**
- * Finishes processing this batch of elements.
- *
- * <p>By default, does nothing.
- */
- public void finishBundle(Context c) throws Exception {
- }
-
- /**
- * {@inheritDoc}
- *
- * <p>By default, does not register any display data. Implementors may override this method
- * to provide their own display data.
- */
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Returns a {@link TypeDescriptor} capturing what is known statically
- * about the input type of this {@code DoFn} instance's most-derived
- * class.
- *
- * <p>See {@link #getOutputTypeDescriptor} for more discussion.
- */
- protected TypeDescriptor<InputT> getInputTypeDescriptor() {
- return new TypeDescriptor<InputT>(getClass()) {};
- }
-
- /**
- * Returns a {@link TypeDescriptor} capturing what is known statically
- * about the output type of this {@code DoFn} instance's
- * most-derived class.
- *
- * <p>In the normal case of a concrete {@code DoFn} subclass with
- * no generic type parameters of its own (including anonymous inner
- * classes), this will be a complete non-generic type, which is good
- * for choosing a default output {@code Coder<OutputT>} for the output
- * {@code PCollection<OutputT>}.
- */
- protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
- return new TypeDescriptor<OutputT>(getClass()) {};
- }
-
- /**
- * Returns an {@link Aggregator} with aggregation logic specified by the
- * {@link CombineFn} argument. The name provided must be unique across
- * {@link Aggregator}s created within the DoFn. Aggregators can only be created
- * during pipeline construction.
- *
- * @param name the name of the aggregator
- * @param combiner the {@link CombineFn} to use in the aggregator
- * @return an aggregator for the provided name and combiner in the scope of
- * this DoFn
- * @throws NullPointerException if the name or combiner is null
- * @throws IllegalArgumentException if the given name collides with another
- * aggregator in this scope
- * @throws IllegalStateException if called during pipeline processing.
- */
- protected final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
- createAggregator(String name, CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
- checkNotNull(name, "name cannot be null");
- checkNotNull(combiner, "combiner cannot be null");
- checkArgument(!aggregators.containsKey(name),
- "Cannot create aggregator with name %s."
- + " An Aggregator with that name already exists within this scope.",
- name);
-
- checkState(!aggregatorsAreFinal, "Cannot create an aggregator during DoFn processing."
- + " Aggregators should be registered during pipeline construction.");
-
- DelegatingAggregator<AggInputT, AggOutputT> aggregator =
- new DelegatingAggregator<>(name, combiner);
- aggregators.put(name, aggregator);
- return aggregator;
- }
-
- /**
- * Returns an {@link Aggregator} with the aggregation logic specified by the
- * {@link SerializableFunction} argument. The name provided must be unique
- * across {@link Aggregator}s created within the DoFn. Aggregators can only be
- * created during pipeline construction.
- *
- * @param name the name of the aggregator
- * @param combiner the {@link SerializableFunction} to use in the aggregator
- * @return an aggregator for the provided name and combiner in the scope of
- * this DoFn
- * @throws NullPointerException if the name or combiner is null
- * @throws IllegalArgumentException if the given name collides with another
- * aggregator in this scope
- * @throws IllegalStateException if called during pipeline processing.
- */
- protected final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator(String name,
- SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) {
- checkNotNull(combiner, "combiner cannot be null.");
- return createAggregator(name, Combine.IterableCombineFn.of(combiner));
- }
-
- /**
- * Returns the {@link Aggregator Aggregators} created by this {@code DoFn}.
- */
- Collection<Aggregator<?, ?>> getAggregators() {
- return Collections.<Aggregator<?, ?>>unmodifiableCollection(aggregators.values());
- }
-
- /**
- * An {@link Aggregator} that delegates calls to addValue to another
- * aggregator.
- *
- * @param <AggInputT> the type of input element
- * @param <AggOutputT> the type of output element
- */
- static class DelegatingAggregator<AggInputT, AggOutputT> implements
- Aggregator<AggInputT, AggOutputT>, Serializable {
- private final UUID id;
-
- private final String name;
-
- private final CombineFn<AggInputT, ?, AggOutputT> combineFn;
-
- private Aggregator<AggInputT, ?> delegate;
-
- public DelegatingAggregator(String name,
- CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
- this.id = UUID.randomUUID();
- this.name = checkNotNull(name, "name cannot be null");
- // Safe contravariant cast
- @SuppressWarnings("unchecked")
- CombineFn<AggInputT, ?, AggOutputT> specificCombiner =
- (CombineFn<AggInputT, ?, AggOutputT>) checkNotNull(combiner, "combineFn cannot be null");
- this.combineFn = specificCombiner;
- }
-
- @Override
- public void addValue(AggInputT value) {
- if (delegate == null) {
- throw new IllegalStateException(
- "addValue cannot be called on Aggregator outside of the execution of a DoFn.");
- } else {
- delegate.addValue(value);
- }
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public CombineFn<AggInputT, ?, AggOutputT> getCombineFn() {
- return combineFn;
- }
-
- /**
- * Sets the current delegate of the Aggregator.
- *
- * @param delegate the delegate to set in this aggregator
- */
- public void setDelegate(Aggregator<AggInputT, ?> delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("name", name)
- .add("combineFn", combineFn)
- .toString();
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(id, name, combineFn.getClass());
- }
-
- /**
- * Indicates whether some other object is "equal to" this one.
- *
- * <p>{@code DelegatingAggregator} instances are equal if they have the same name, their
- * CombineFns are the same class, and they have identical IDs.
- */
- @Override
- public boolean equals(Object o) {
- if (o == this) {
- return true;
- }
- if (o == null) {
- return false;
- }
- if (o instanceof DelegatingAggregator) {
- DelegatingAggregator<?, ?> that = (DelegatingAggregator<?, ?>) o;
- return Objects.equals(this.id, that.id)
- && Objects.equals(this.name, that.name)
- && Objects.equals(this.combineFn.getClass(), that.combineFn.getClass());
- }
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
index 0616eff..d8d4181 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
@@ -194,7 +194,7 @@ public abstract class DoFnReflector {
*/
public abstract boolean usesSingleWindow();
- /** Create an {@link DoFnInvoker} bound to the given {@link DoFn}. */
+ /** Create an {@link DoFnInvoker} bound to the given {@link OldDoFn}. */
public abstract <InputT, OutputT> DoFnInvoker<InputT, OutputT> bindInvoker(
DoFnWithContext<InputT, OutputT> fn);
@@ -217,9 +217,9 @@ public abstract class DoFnReflector {
}
/**
- * Create a {@link DoFn} that the {@link DoFnWithContext}.
+ * Create a {@link OldDoFn} that the {@link DoFnWithContext}.
*/
- public <InputT, OutputT> DoFn<InputT, OutputT> toDoFn(DoFnWithContext<InputT, OutputT> fn) {
+ public <InputT, OutputT> OldDoFn<InputT, OutputT> toDoFn(DoFnWithContext<InputT, OutputT> fn) {
if (usesSingleWindow()) {
return new WindowDoFnAdapter<InputT, OutputT>(this, fn);
} else {
@@ -287,7 +287,7 @@ public abstract class DoFnReflector {
* <li>Any generics on the extra context arguments match what is expected. Eg.,
* {@code WindowingInternals<InputT, OutputT>} either matches the
* {@code InputT} and {@code OutputT} parameters of the
- * {@code DoFn<InputT, OutputT>.ProcessContext}, or it uses a wildcard, etc.
+ * {@code OldDoFn<InputT, OutputT>.ProcessContext}, or it uses a wildcard, etc.
* </ol>
*
* @param m the method to verify
@@ -328,7 +328,7 @@ public abstract class DoFnReflector {
AdditionalParameter[] contextInfos = new AdditionalParameter[params.length - 1];
// Fill in the generics in the allExtraContextArgs interface from the types in the
- // Context or ProcessContext DoFn.
+ // Context or ProcessContext OldDoFn.
ParameterizedType pt = (ParameterizedType) contextToken.getType();
// We actually want the owner, since ProcessContext and Context are owned by DoFnWithContext.
pt = (ParameterizedType) pt.getOwnerType();
@@ -364,18 +364,18 @@ public abstract class DoFnReflector {
return ImmutableList.copyOf(contextInfos);
}
- /** Interface for invoking the {@code DoFn} processing methods. */
+ /** Interface for invoking the {@code OldDoFn} processing methods. */
public interface DoFnInvoker<InputT, OutputT> {
- /** Invoke {@link DoFn#startBundle} on the bound {@code DoFn}. */
+ /** Invoke {@link OldDoFn#startBundle} on the bound {@code OldDoFn}. */
void invokeStartBundle(
DoFnWithContext<InputT, OutputT>.Context c,
ExtraContextFactory<InputT, OutputT> extra);
- /** Invoke {@link DoFn#finishBundle} on the bound {@code DoFn}. */
+ /** Invoke {@link OldDoFn#finishBundle} on the bound {@code OldDoFn}. */
void invokeFinishBundle(
DoFnWithContext<InputT, OutputT>.Context c,
ExtraContextFactory<InputT, OutputT> extra);
- /** Invoke {@link DoFn#processElement} on the bound {@code DoFn}. */
+ /** Invoke {@link OldDoFn#processElement} on the bound {@code OldDoFn}. */
public void invokeProcessElement(
DoFnWithContext<InputT, OutputT>.ProcessContext c,
ExtraContextFactory<InputT, OutputT> extra);
@@ -565,10 +565,10 @@ public abstract class DoFnReflector {
extends DoFnWithContext<InputT, OutputT>.Context
implements DoFnWithContext.ExtraContextFactory<InputT, OutputT> {
- private DoFn<InputT, OutputT>.Context context;
+ private OldDoFn<InputT, OutputT>.Context context;
private ContextAdapter(
- DoFnWithContext<InputT, OutputT> fn, DoFn<InputT, OutputT>.Context context) {
+ DoFnWithContext<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
fn.super();
this.context = context;
}
@@ -618,11 +618,11 @@ public abstract class DoFnReflector {
extends DoFnWithContext<InputT, OutputT>.ProcessContext
implements DoFnWithContext.ExtraContextFactory<InputT, OutputT> {
- private DoFn<InputT, OutputT>.ProcessContext context;
+ private OldDoFn<InputT, OutputT>.ProcessContext context;
private ProcessContextAdapter(
DoFnWithContext<InputT, OutputT> fn,
- DoFn<InputT, OutputT>.ProcessContext context) {
+ OldDoFn<InputT, OutputT>.ProcessContext context) {
fn.super();
this.context = context;
}
@@ -683,7 +683,7 @@ public abstract class DoFnReflector {
}
}
- public static Class<?> getDoFnClass(DoFn<?, ?> fn) {
+ public static Class<?> getDoFnClass(OldDoFn<?, ?> fn) {
if (fn instanceof SimpleDoFnAdapter) {
return ((SimpleDoFnAdapter<?, ?>) fn).fn.getClass();
} else {
@@ -691,7 +691,7 @@ public abstract class DoFnReflector {
}
}
- private static class SimpleDoFnAdapter<InputT, OutputT> extends DoFn<InputT, OutputT> {
+ private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
private final DoFnWithContext<InputT, OutputT> fn;
private transient DoFnInvoker<InputT, OutputT> invoker;
@@ -703,19 +703,19 @@ public abstract class DoFnReflector {
}
@Override
- public void startBundle(DoFn<InputT, OutputT>.Context c) throws Exception {
+ public void startBundle(OldDoFn<InputT, OutputT>.Context c) throws Exception {
ContextAdapter<InputT, OutputT> adapter = new ContextAdapter<>(fn, c);
invoker.invokeStartBundle(adapter, adapter);
}
@Override
- public void finishBundle(DoFn<InputT, OutputT>.Context c) throws Exception {
+ public void finishBundle(OldDoFn<InputT, OutputT>.Context c) throws Exception {
ContextAdapter<InputT, OutputT> adapter = new ContextAdapter<>(fn, c);
invoker.invokeFinishBundle(adapter, adapter);
}
@Override
- public void processElement(DoFn<InputT, OutputT>.ProcessContext c) throws Exception {
+ public void processElement(OldDoFn<InputT, OutputT>.ProcessContext c) throws Exception {
ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c);
invoker.invokeProcessElement(adapter, adapter);
}
@@ -743,7 +743,7 @@ public abstract class DoFnReflector {
}
private static class WindowDoFnAdapter<InputT, OutputT>
- extends SimpleDoFnAdapter<InputT, OutputT> implements DoFn.RequiresWindowAccess {
+ extends SimpleDoFnAdapter<InputT, OutputT> implements OldDoFn.RequiresWindowAccess {
private WindowDoFnAdapter(DoFnReflector reflector, DoFnWithContext<InputT, OutputT> fn) {
super(reflector, fn);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index a136632..9336e4c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -49,12 +49,12 @@ import java.util.List;
import java.util.Map;
/**
- * A harness for unit-testing a {@link DoFn}.
+ * A harness for unit-testing a {@link OldDoFn}.
*
* <p>For example:
*
* <pre> {@code
- * DoFn<InputT, OutputT> fn = ...;
+ * OldDoFn<InputT, OutputT> fn = ...;
*
* DoFnTester<InputT, OutputT> fnTester = DoFnTester.of(fn);
*
@@ -71,22 +71,22 @@ import java.util.Map;
* Assert.assertThat(fnTester.processBundle(i1, i2, ...), Matchers.hasItems(...));
* } </pre>
*
- * @param <InputT> the type of the {@code DoFn}'s (main) input elements
- * @param <OutputT> the type of the {@code DoFn}'s (main) output elements
+ * @param <InputT> the type of the {@code OldDoFn}'s (main) input elements
+ * @param <OutputT> the type of the {@code OldDoFn}'s (main) output elements
*/
public class DoFnTester<InputT, OutputT> {
/**
* Returns a {@code DoFnTester} supporting unit-testing of the given
- * {@link DoFn}.
+ * {@link OldDoFn}.
*/
@SuppressWarnings("unchecked")
- public static <InputT, OutputT> DoFnTester<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
+ public static <InputT, OutputT> DoFnTester<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) {
return new DoFnTester<InputT, OutputT>(fn);
}
/**
* Returns a {@code DoFnTester} supporting unit-testing of the given
- * {@link DoFn}.
+ * {@link OldDoFn}.
*/
@SuppressWarnings("unchecked")
public static <InputT, OutputT> DoFnTester<InputT, OutputT>
@@ -96,12 +96,12 @@ public class DoFnTester<InputT, OutputT> {
/**
* Registers the tuple of values of the side input {@link PCollectionView}s to
- * pass to the {@link DoFn} under test.
+ * pass to the {@link OldDoFn} under test.
*
* <p>Resets the state of this {@link DoFnTester}.
*
* <p>If this isn't called, {@code DoFnTester} assumes the
- * {@link DoFn} takes no side inputs.
+ * {@link OldDoFn} takes no side inputs.
*/
public void setSideInputs(Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs) {
this.sideInputs = sideInputs;
@@ -109,8 +109,8 @@ public class DoFnTester<InputT, OutputT> {
}
/**
- * Registers the values of a side input {@link PCollectionView} to pass to the {@link DoFn} under
- * test.
+ * Registers the values of a side input {@link PCollectionView} to pass to the {@link OldDoFn}
+ * under test.
*
* <p>The provided value is the final value of the side input in the specified window, not
* the value of the input PCollection in that window.
@@ -128,7 +128,7 @@ public class DoFnTester<InputT, OutputT> {
}
/**
- * Whether or not a {@link DoFnTester} should clone the {@link DoFn} under test.
+ * Whether or not a {@link DoFnTester} should clone the {@link OldDoFn} under test.
*/
public enum CloningBehavior {
CLONE,
@@ -136,14 +136,14 @@ public class DoFnTester<InputT, OutputT> {
}
/**
- * Instruct this {@link DoFnTester} whether or not to clone the {@link DoFn} under test.
+ * Instruct this {@link DoFnTester} whether or not to clone the {@link OldDoFn} under test.
*/
public void setCloningBehavior(CloningBehavior newValue) {
this.cloningBehavior = newValue;
}
/**
- * Indicates whether this {@link DoFnTester} will clone the {@link DoFn} under test.
+ * Indicates whether this {@link DoFnTester} will clone the {@link OldDoFn} under test.
*/
public CloningBehavior getCloningBehavior() {
return cloningBehavior;
@@ -165,7 +165,7 @@ public class DoFnTester<InputT, OutputT> {
}
/**
- * A convenience method for testing {@link DoFn DoFns} with bundles of elements.
+ * A convenience method for testing {@link OldDoFn DoFns} with bundles of elements.
* Logic proceeds as follows:
*
* <ol>
@@ -181,9 +181,9 @@ public class DoFnTester<InputT, OutputT> {
}
/**
- * Calls {@link DoFn#startBundle} on the {@code DoFn} under test.
+ * Calls {@link OldDoFn#startBundle} on the {@code OldDoFn} under test.
*
- * <p>If needed, first creates a fresh instance of the DoFn under test.
+ * <p>If needed, first creates a fresh instance of the OldDoFn under test.
*/
public void startBundle() throws Exception {
resetState();
@@ -195,14 +195,14 @@ public class DoFnTester<InputT, OutputT> {
}
/**
- * Calls {@link DoFn#processElement} on the {@code DoFn} under test, in a
- * context where {@link DoFn.ProcessContext#element} returns the
+ * Calls {@link OldDoFn#processElement} on the {@code OldDoFn} under test, in a
+ * context where {@link OldDoFn.ProcessContext#element} returns the
* given element.
*
* <p>Will call {@link #startBundle} automatically, if it hasn't
* already been called.
*
- * @throws IllegalStateException if the {@code DoFn} under test has already
+ * @throws IllegalStateException if the {@code OldDoFn} under test has already
* been finished
*/
public void processElement(InputT element) throws Exception {
@@ -216,12 +216,12 @@ public class DoFnTester<InputT, OutputT> {
}
/**
- * Calls {@link DoFn#finishBundle} of the {@code DoFn} under test.
+ * Calls {@link OldDoFn#finishBundle} of the {@code OldDoFn} under test.
*
* <p>Will call {@link #startBundle} automatically, if it hasn't
* already been called.
*
- * @throws IllegalStateException if the {@code DoFn} under test has already
+ * @throws IllegalStateException if the {@code OldDoFn} under test has already
* been finished
*/
public void finishBundle() throws Exception {
@@ -403,18 +403,18 @@ public class DoFnTester<InputT, OutputT> {
return MoreObjects.firstNonNull(elems, Collections.<WindowedValue<T>>emptyList());
}
- private TestContext<InputT, OutputT> createContext(DoFn<InputT, OutputT> fn) {
+ private TestContext<InputT, OutputT> createContext(OldDoFn<InputT, OutputT> fn) {
return new TestContext<>(fn, options, mainOutputTag, outputs, accumulators);
}
- private static class TestContext<InT, OutT> extends DoFn<InT, OutT>.Context {
+ private static class TestContext<InT, OutT> extends OldDoFn<InT, OutT>.Context {
private final PipelineOptions opts;
private final TupleTag<OutT> mainOutputTag;
private final Map<TupleTag<?>, List<WindowedValue<?>>> outputs;
private final Map<String, Object> accumulators;
public TestContext(
- DoFn<InT, OutT> fn,
+ OldDoFn<InT, OutT> fn,
PipelineOptions opts,
TupleTag<OutT> mainOutputTag,
Map<TupleTag<?>, List<WindowedValue<?>>> outputs,
@@ -498,7 +498,7 @@ public class DoFnTester<InputT, OutputT> {
}
private TestProcessContext<InputT, OutputT> createProcessContext(
- DoFn<InputT, OutputT> fn,
+ OldDoFn<InputT, OutputT> fn,
InputT elem) {
return new TestProcessContext<>(fn,
createContext(fn),
@@ -507,14 +507,14 @@ public class DoFnTester<InputT, OutputT> {
sideInputs);
}
- private static class TestProcessContext<InT, OutT> extends DoFn<InT, OutT>.ProcessContext {
+ private static class TestProcessContext<InT, OutT> extends OldDoFn<InT, OutT>.ProcessContext {
private final TestContext<InT, OutT> context;
private final TupleTag<OutT> mainOutputTag;
private final WindowedValue<InT> element;
private final Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs;
private TestProcessContext(
- DoFn<InT, OutT> fn,
+ OldDoFn<InT, OutT> fn,
TestContext<InT, OutT> context,
WindowedValue<InT> element,
TupleTag<OutT> mainOutputTag,
@@ -643,15 +643,15 @@ public class DoFnTester<InputT, OutputT> {
protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
throw new IllegalStateException("Aggregators should not be created within ProcessContext. "
- + "Instead, create an aggregator at DoFn construction time with createAggregatorForDoFn,"
- + " and ensure they are set up by the time startBundle is called "
- + "with setupDelegateAggregators.");
+ + "Instead, create an aggregator at OldDoFn construction time with"
+ + " createAggregatorForDoFn, and ensure they are set up by the time startBundle is"
+ + " called with setupDelegateAggregators.");
}
}
/////////////////////////////////////////////////////////////////////////////
- /** The possible states of processing a DoFn. */
+ /** The possible states of processing a OldDoFn. */
enum State {
UNSTARTED,
STARTED,
@@ -660,35 +660,35 @@ public class DoFnTester<InputT, OutputT> {
private final PipelineOptions options = PipelineOptionsFactory.create();
- /** The original DoFn under test. */
- private final DoFn<InputT, OutputT> origFn;
+ /** The original OldDoFn under test. */
+ private final OldDoFn<InputT, OutputT> origFn;
/**
- * Whether to clone the original {@link DoFn} or just use it as-is.
+ * Whether to clone the original {@link OldDoFn} or just use it as-is.
*
- * <p></p>Worker-side {@link DoFn DoFns} may not be serializable, and are not required to be.
+ * <p></p>Worker-side {@link OldDoFn DoFns} may not be serializable, and are not required to be.
*/
private CloningBehavior cloningBehavior = CloningBehavior.CLONE;
- /** The side input values to provide to the DoFn under test. */
+ /** The side input values to provide to the OldDoFn under test. */
private Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs =
new HashMap<>();
private Map<String, Object> accumulators;
- /** The output tags used by the DoFn under test. */
+ /** The output tags used by the OldDoFn under test. */
private TupleTag<OutputT> mainOutputTag = new TupleTag<>();
- /** The original DoFn under test, if started. */
- DoFn<InputT, OutputT> fn;
+ /** The original OldDoFn under test, if started. */
+ OldDoFn<InputT, OutputT> fn;
/** The ListOutputManager to examine the outputs. */
private Map<TupleTag<?>, List<WindowedValue<?>>> outputs;
- /** The state of processing of the DoFn under test. */
+ /** The state of processing of the OldDoFn under test. */
private State state;
- private DoFnTester(DoFn<InputT, OutputT> origFn) {
+ private DoFnTester(OldDoFn<InputT, OutputT> origFn) {
this.origFn = origFn;
resetState();
}
@@ -705,7 +705,7 @@ public class DoFnTester<InputT, OutputT> {
if (cloningBehavior.equals(CloningBehavior.DO_NOT_CLONE)) {
fn = origFn;
} else {
- fn = (DoFn<InputT, OutputT>)
+ fn = (OldDoFn<InputT, OutputT>)
SerializableUtils.deserializeFromByteArray(
SerializableUtils.serializeToByteArray(origFn),
origFn.toString());