You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/04 01:26:20 UTC
[12/19] incubator-beam git commit: Rename DoFn to OldDoFn
Rename DoFn to OldDoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a64baf48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a64baf48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a64baf48
Branch: refs/heads/master
Commit: a64baf4878f28e98da696dacc587c1151d0cdb9e
Parents: 388816a
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jul 22 13:00:10 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Aug 3 18:25:52 2016 -0700
----------------------------------------------------------------------
.../beam/examples/DebuggingWordCount.java | 6 +-
.../apache/beam/examples/MinimalWordCount.java | 7 +-
.../apache/beam/examples/WindowedWordCount.java | 10 +-
.../org/apache/beam/examples/WordCount.java | 8 +-
.../examples/common/PubsubFileInjector.java | 6 +-
.../beam/examples/complete/AutoComplete.java | 16 +-
.../examples/complete/StreamingWordExtract.java | 12 +-
.../apache/beam/examples/complete/TfIdf.java | 16 +-
.../examples/complete/TopWikipediaSessions.java | 12 +-
.../examples/complete/TrafficMaxLaneFlow.java | 10 +-
.../beam/examples/complete/TrafficRoutes.java | 12 +-
.../examples/cookbook/BigQueryTornadoes.java | 6 +-
.../cookbook/CombinePerKeyExamples.java | 6 +-
.../examples/cookbook/DatastoreWordCount.java | 11 +-
.../beam/examples/cookbook/FilterExamples.java | 12 +-
.../beam/examples/cookbook/JoinExamples.java | 10 +-
.../examples/cookbook/MaxPerKeyExamples.java | 6 +-
.../beam/examples/cookbook/TriggerExample.java | 12 +-
.../org/apache/beam/examples/WordCountTest.java | 2 +-
.../examples/complete/AutoCompleteTest.java | 4 +-
.../examples/cookbook/TriggerExampleTest.java | 4 +-
.../beam/examples/complete/game/GameStats.java | 10 +-
.../beam/examples/complete/game/UserScore.java | 4 +-
.../complete/game/utils/WriteToBigQuery.java | 12 +-
.../game/utils/WriteWindowedToBigQuery.java | 8 +-
.../examples/complete/game/UserScoreTest.java | 2 +-
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 12 +-
.../core/UnboundedReadFromBoundedSource.java | 2 +-
.../apache/beam/sdk/util/AssignWindowsDoFn.java | 10 +-
.../org/apache/beam/sdk/util/DoFnRunner.java | 21 +-
.../apache/beam/sdk/util/DoFnRunnerBase.java | 54 +-
.../org/apache/beam/sdk/util/DoFnRunners.java | 24 +-
.../beam/sdk/util/GroupAlsoByWindowsDoFn.java | 6 +-
.../GroupAlsoByWindowsViaOutputBufferDoFn.java | 4 +-
.../sdk/util/GroupByKeyViaGroupByKeyOnly.java | 6 +-
.../sdk/util/LateDataDroppingDoFnRunner.java | 4 +-
.../apache/beam/sdk/util/PaneInfoTracker.java | 1 -
.../apache/beam/sdk/util/ReduceFnRunner.java | 4 +-
.../apache/beam/sdk/util/SimpleDoFnRunner.java | 12 +-
.../org/apache/beam/sdk/util/WatermarkHold.java | 1 -
.../beam/sdk/util/ReduceFnRunnerTest.java | 1 +
.../apache/beam/sdk/util/ReduceFnTester.java | 1 +
.../beam/sdk/util/SimpleDoFnRunnerTest.java | 6 +-
.../GroupAlsoByWindowEvaluatorFactory.java | 4 +-
.../ImmutabilityCheckingBundleFactory.java | 4 +-
.../beam/runners/direct/ParDoEvaluator.java | 4 +-
.../direct/ParDoMultiEvaluatorFactory.java | 11 +-
.../direct/ParDoSingleEvaluatorFactory.java | 11 +-
.../direct/TransformEvaluatorFactory.java | 6 +-
.../direct/WriteWithShardingFactory.java | 4 +-
.../ConsumerTrackingPipelineVisitorTest.java | 22 +-
.../beam/runners/direct/DirectRunnerTest.java | 24 +-
.../ImmutabilityCheckingBundleFactoryTest.java | 6 +-
.../ImmutabilityEnforcementFactoryTest.java | 6 +-
.../direct/KeyedPValueTrackingVisitorTest.java | 6 +-
.../beam/runners/direct/ParDoEvaluatorTest.java | 6 +-
.../direct/ParDoMultiEvaluatorFactoryTest.java | 10 +-
.../direct/ParDoSingleEvaluatorFactoryTest.java | 10 +-
.../runners/direct/WatermarkManagerTest.java | 7 +-
.../beam/runners/flink/examples/TFIDF.java | 16 +-
.../beam/runners/flink/examples/WordCount.java | 4 +-
.../flink/examples/streaming/AutoComplete.java | 16 +-
.../flink/examples/streaming/JoinExamples.java | 8 +-
.../examples/streaming/KafkaIOExamples.java | 4 +-
.../KafkaWindowedWordCountExample.java | 6 +-
.../examples/streaming/WindowedWordCount.java | 6 +-
.../FlinkBatchTransformTranslators.java | 12 +-
.../FlinkStreamingTransformTranslators.java | 9 +-
.../functions/FlinkDoFnFunction.java | 10 +-
.../FlinkMergingNonShuffleReduceFunction.java | 8 +-
.../functions/FlinkMultiOutputDoFnFunction.java | 10 +-
.../FlinkMultiOutputProcessContext.java | 6 +-
.../functions/FlinkNoElementAssignContext.java | 8 +-
.../functions/FlinkPartialReduceFunction.java | 8 +-
.../functions/FlinkProcessContext.java | 16 +-
.../functions/FlinkReduceFunction.java | 8 +-
.../streaming/FlinkAbstractParDoWrapper.java | 18 +-
.../FlinkGroupAlsoByWindowWrapper.java | 10 +-
.../streaming/FlinkParDoBoundMultiWrapper.java | 4 +-
.../streaming/FlinkParDoBoundWrapper.java | 4 +-
.../state/AbstractFlinkTimerInternals.java | 4 +-
.../beam/runners/flink/PipelineOptionsTest.java | 6 +-
.../beam/runners/flink/ReadSourceITCase.java | 4 +-
.../flink/ReadSourceStreamingITCase.java | 4 +-
.../flink/streaming/GroupByNullKeyTest.java | 8 +-
.../streaming/TopWikipediaSessionsITCase.java | 6 +-
.../dataflow/DataflowPipelineTranslator.java | 6 +-
.../beam/runners/dataflow/DataflowRunner.java | 83 ++-
.../dataflow/internal/AssignWindows.java | 6 +-
.../beam/runners/dataflow/util/DoFnInfo.java | 16 +-
.../DataflowPipelineTranslatorTest.java | 10 +-
.../beam/runners/spark/examples/WordCount.java | 4 +-
.../runners/spark/translation/DoFnFunction.java | 8 +-
.../spark/translation/MultiDoFnFunction.java | 8 +-
.../spark/translation/SparkProcessContext.java | 18 +-
.../spark/translation/TransformTranslator.java | 7 +-
.../streaming/StreamingTransformTranslator.java | 4 +-
.../apache/beam/runners/spark/TfIdfTest.java | 12 +-
.../spark/translation/CombinePerKeyTest.java | 4 +-
.../spark/translation/DoFnOutputTest.java | 4 +-
.../translation/MultiOutputWordCountTest.java | 8 +-
.../spark/translation/SerializationTest.java | 10 +-
.../spark/translation/SideEffectsTest.java | 4 +-
.../streaming/KafkaStreamingTest.java | 4 +-
.../org/apache/beam/sdk/coders/AvroCoder.java | 1 -
.../apache/beam/sdk/coders/DurationCoder.java | 1 -
.../apache/beam/sdk/coders/InstantCoder.java | 1 -
.../java/org/apache/beam/sdk/io/PubsubIO.java | 6 +-
.../apache/beam/sdk/io/PubsubUnboundedSink.java | 8 +-
.../beam/sdk/io/PubsubUnboundedSource.java | 4 +-
.../java/org/apache/beam/sdk/io/Source.java | 2 +-
.../main/java/org/apache/beam/sdk/io/Write.java | 21 +-
.../org/apache/beam/sdk/options/GcpOptions.java | 1 -
.../beam/sdk/options/PipelineOptions.java | 8 +-
.../sdk/options/PipelineOptionsFactory.java | 1 -
.../sdk/options/PipelineOptionsReflector.java | 1 +
.../beam/sdk/runners/AggregatorValues.java | 4 +-
.../org/apache/beam/sdk/testing/PAssert.java | 24 +-
.../beam/sdk/testing/SerializableMatchers.java | 1 -
.../apache/beam/sdk/testing/TestPipeline.java | 1 -
.../beam/sdk/testing/TestPipelineOptions.java | 1 +
.../apache/beam/sdk/transforms/Aggregator.java | 14 +-
.../sdk/transforms/AggregatorRetriever.java | 6 +-
.../org/apache/beam/sdk/transforms/Combine.java | 14 +-
.../apache/beam/sdk/transforms/CombineFns.java | 4 +-
.../org/apache/beam/sdk/transforms/Count.java | 2 +-
.../org/apache/beam/sdk/transforms/Create.java | 2 +-
.../org/apache/beam/sdk/transforms/DoFn.java | 565 -------------------
.../beam/sdk/transforms/DoFnReflector.java | 38 +-
.../apache/beam/sdk/transforms/DoFnTester.java | 86 +--
.../beam/sdk/transforms/DoFnWithContext.java | 16 +-
.../org/apache/beam/sdk/transforms/Filter.java | 2 +-
.../beam/sdk/transforms/FlatMapElements.java | 2 +-
.../org/apache/beam/sdk/transforms/Flatten.java | 2 +-
.../apache/beam/sdk/transforms/GroupByKey.java | 2 +-
.../transforms/IntraBundleParallelization.java | 40 +-
.../org/apache/beam/sdk/transforms/Keys.java | 2 +-
.../org/apache/beam/sdk/transforms/KvSwap.java | 2 +-
.../apache/beam/sdk/transforms/MapElements.java | 2 +-
.../org/apache/beam/sdk/transforms/OldDoFn.java | 565 +++++++++++++++++++
.../apache/beam/sdk/transforms/PTransform.java | 2 +-
.../org/apache/beam/sdk/transforms/ParDo.java | 203 +++----
.../apache/beam/sdk/transforms/Partition.java | 2 +-
.../beam/sdk/transforms/RemoveDuplicates.java | 2 +-
.../org/apache/beam/sdk/transforms/Sample.java | 4 +-
.../beam/sdk/transforms/SimpleFunction.java | 6 +-
.../org/apache/beam/sdk/transforms/Values.java | 2 +-
.../org/apache/beam/sdk/transforms/View.java | 8 +-
.../apache/beam/sdk/transforms/WithKeys.java | 2 +-
.../beam/sdk/transforms/WithTimestamps.java | 4 +-
.../sdk/transforms/display/DisplayData.java | 1 -
.../beam/sdk/transforms/join/CoGbkResult.java | 1 -
.../beam/sdk/transforms/join/CoGroupByKey.java | 14 +-
.../sdk/transforms/windowing/AfterEach.java | 1 +
.../windowing/AfterProcessingTime.java | 1 +
.../transforms/windowing/IntervalWindow.java | 1 -
.../beam/sdk/transforms/windowing/Never.java | 1 +
.../beam/sdk/transforms/windowing/PaneInfo.java | 10 +-
.../beam/sdk/transforms/windowing/Window.java | 4 +-
.../beam/sdk/util/BaseExecutionContext.java | 4 +-
.../apache/beam/sdk/util/BucketingFunction.java | 1 +
.../beam/sdk/util/CombineContextFactory.java | 6 +-
.../apache/beam/sdk/util/ExecutionContext.java | 8 +-
.../apache/beam/sdk/util/MovingFunction.java | 1 +
.../beam/sdk/util/PerKeyCombineFnRunner.java | 44 +-
.../beam/sdk/util/PerKeyCombineFnRunners.java | 30 +-
.../org/apache/beam/sdk/util/PubsubClient.java | 1 +
.../apache/beam/sdk/util/PubsubTestClient.java | 1 +
.../sdk/util/ReifyTimestampAndWindowsDoFn.java | 6 +-
.../org/apache/beam/sdk/util/Reshuffle.java | 4 +-
.../apache/beam/sdk/util/SerializableUtils.java | 2 +-
.../org/apache/beam/sdk/util/StringUtils.java | 2 +-
.../beam/sdk/util/SystemDoFnInternal.java | 6 +-
.../apache/beam/sdk/util/TimerInternals.java | 1 -
.../apache/beam/sdk/util/ValueWithRecordId.java | 6 +-
.../org/apache/beam/sdk/util/WindowedValue.java | 1 -
.../beam/sdk/util/WindowingInternals.java | 4 +-
.../beam/sdk/util/common/ReflectHelpers.java | 1 +
.../beam/sdk/values/TimestampedValue.java | 1 -
.../java/org/apache/beam/sdk/PipelineTest.java | 6 +-
.../apache/beam/sdk/coders/AvroCoderTest.java | 4 +-
.../beam/sdk/coders/CoderRegistryTest.java | 6 +-
.../beam/sdk/coders/SerializableCoderTest.java | 6 +-
.../org/apache/beam/sdk/io/AvroSourceTest.java | 1 +
.../io/BoundedReadFromUnboundedSourceTest.java | 1 +
.../beam/sdk/io/CompressedSourceTest.java | 1 +
.../apache/beam/sdk/io/CountingInputTest.java | 5 +-
.../apache/beam/sdk/io/CountingSourceTest.java | 4 +-
.../beam/sdk/io/OffsetBasedSourceTest.java | 1 +
.../beam/sdk/io/PubsubUnboundedSinkTest.java | 4 +-
.../java/org/apache/beam/sdk/io/ReadTest.java | 1 +
.../java/org/apache/beam/sdk/io/TextIOTest.java | 1 +
.../java/org/apache/beam/sdk/io/WriteTest.java | 7 +-
.../org/apache/beam/sdk/io/XmlSinkTest.java | 1 +
.../apache/beam/sdk/options/GcpOptionsTest.java | 1 +
.../sdk/options/GoogleApiDebugOptionsTest.java | 1 -
.../sdk/options/PipelineOptionsFactoryTest.java | 1 -
.../beam/sdk/options/PipelineOptionsTest.java | 1 -
.../sdk/options/ProxyInvocationHandlerTest.java | 2 +-
.../AggregatorPipelineExtractorTest.java | 6 +-
.../apache/beam/sdk/testing/PAssertTest.java | 1 -
.../beam/sdk/testing/TestPipelineTest.java | 1 -
.../transforms/ApproximateQuantilesTest.java | 1 +
.../sdk/transforms/ApproximateUniqueTest.java | 5 +-
.../beam/sdk/transforms/CombineFnsTest.java | 2 +-
.../apache/beam/sdk/transforms/CombineTest.java | 12 +-
.../apache/beam/sdk/transforms/CreateTest.java | 2 +-
.../beam/sdk/transforms/DoFnContextTest.java | 69 ---
.../DoFnDelegatingAggregatorTest.java | 16 +-
.../beam/sdk/transforms/DoFnReflectorTest.java | 2 +-
.../apache/beam/sdk/transforms/DoFnTest.java | 242 --------
.../beam/sdk/transforms/DoFnTesterTest.java | 10 +-
.../sdk/transforms/DoFnWithContextTest.java | 6 +-
.../apache/beam/sdk/transforms/FlattenTest.java | 4 +-
.../beam/sdk/transforms/GroupByKeyTest.java | 6 +-
.../IntraBundleParallelizationTest.java | 23 +-
.../beam/sdk/transforms/MapElementsTest.java | 1 +
.../org/apache/beam/sdk/transforms/MaxTest.java | 1 +
.../org/apache/beam/sdk/transforms/MinTest.java | 2 +
.../apache/beam/sdk/transforms/NoOpDoFn.java | 20 +-
.../beam/sdk/transforms/OldDoFnContextTest.java | 69 +++
.../apache/beam/sdk/transforms/OldDoFnTest.java | 242 ++++++++
.../apache/beam/sdk/transforms/ParDoTest.java | 96 ++--
.../beam/sdk/transforms/PartitionTest.java | 1 +
.../apache/beam/sdk/transforms/SampleTest.java | 1 +
.../org/apache/beam/sdk/transforms/TopTest.java | 1 +
.../apache/beam/sdk/transforms/ViewTest.java | 398 ++++++-------
.../beam/sdk/transforms/WithTimestampsTest.java | 8 +-
.../display/DisplayDataEvaluatorTest.java | 6 +-
.../display/DisplayDataMatchersTest.java | 1 +
.../sdk/transforms/display/DisplayDataTest.java | 6 +-
.../sdk/transforms/join/CoGroupByKeyTest.java | 18 +-
.../sdk/transforms/windowing/NeverTest.java | 1 +
.../sdk/transforms/windowing/WindowTest.java | 6 +-
.../sdk/transforms/windowing/WindowingTest.java | 10 +-
.../beam/sdk/util/BucketingFunctionTest.java | 4 +-
.../beam/sdk/util/MovingFunctionTest.java | 4 +-
.../beam/sdk/util/SerializableUtilsTest.java | 1 -
.../apache/beam/sdk/util/SerializerTest.java | 1 -
.../apache/beam/sdk/util/StringUtilsTest.java | 16 +-
.../org/apache/beam/sdk/util/TriggerTester.java | 1 +
.../beam/sdk/util/common/CounterTest.java | 1 +
.../beam/sdk/values/PCollectionTupleTest.java | 4 +-
.../apache/beam/sdk/values/TypedPValueTest.java | 6 +-
.../beam/sdk/extensions/joinlibrary/Join.java | 8 +-
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 18 +-
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 4 +-
.../beam/sdk/io/gcp/datastore/V1Beta3.java | 13 +-
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 6 +-
.../sdk/io/gcp/bigtable/BigtableWriteIT.java | 4 +-
.../sdk/io/gcp/datastore/V1Beta3TestUtil.java | 6 +-
.../java/org/apache/beam/sdk/io/jms/JmsIO.java | 4 +-
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 8 +-
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 7 +-
.../sdk/transforms/WithTimestampsJava8Test.java | 4 +-
.../src/main/java/DebuggingWordCount.java | 4 +-
.../src/main/java/MinimalWordCount.java | 6 +-
.../src/main/java/WindowedWordCount.java | 6 +-
.../src/main/java/WordCount.java | 6 +-
.../main/java/common/PubsubFileInjector.java | 4 +-
.../src/main/java/StarterPipeline.java | 6 +-
.../src/main/java/it/pkg/StarterPipeline.java | 6 +-
.../transforms/DoFnReflectorBenchmark.java | 14 +-
263 files changed, 2196 insertions(+), 2151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index 8d85d44..3c43152 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
@@ -106,8 +106,8 @@ import java.util.regex.Pattern;
* overridden with {@code --inputFile}.
*/
public class DebuggingWordCount {
- /** A DoFn that filters for a specific key based upon a regular expression. */
- public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
+ /** A OldDoFn that filters for a specific key based upon a regular expression. */
+ public static class FilterTextFn extends OldDoFn<KV<String, Long>, KV<String, Long>> {
/**
* Concept #1: The logger below uses the fully qualified class name of FilterTextFn
* as the logger. All log statements emitted by this logger will be referenced by this name
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
index 9f6d61a..ab0bb6d 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
@@ -22,8 +22,8 @@ import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
@@ -89,10 +89,11 @@ public class MinimalWordCount {
// the input text (a set of Shakespeare's texts).
p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
// Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a
- // DoFn (defined in-line) on each element that tokenizes the text line into individual words.
+ // OldDoFn (defined in-line) on each element that tokenizes the text line into individua
+ // words.
// The ParDo returns a PCollection<String>, where each element is an individual word in
// Shakespeare's collected texts.
- .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
+ .apply("ExtractWords", ParDo.of(new OldDoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
for (String word : c.element().split("[^a-zA-Z']+")) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index 7a4b29f..17f7da3 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -103,14 +103,14 @@ public class WindowedWordCount {
static final int WINDOW_SIZE = 1; // Default window duration in minutes
/**
- * Concept #2: A DoFn that sets the data element timestamp. This is a silly method, just for
+ * Concept #2: A OldDoFn that sets the data element timestamp. This is a silly method, just for
* this example, for the bounded data case.
*
* <p>Imagine that many ghosts of Shakespeare are all typing madly at the same time to recreate
* his masterworks. Each line of the corpus will get a random associated timestamp somewhere in a
* 2-hour period.
*/
- static class AddTimestampFn extends DoFn<String, String> {
+ static class AddTimestampFn extends OldDoFn<String, String> {
private static final Duration RAND_RANGE = Duration.standardHours(2);
private final Instant minTimestamp;
@@ -130,8 +130,8 @@ public class WindowedWordCount {
}
}
- /** A DoFn that converts a Word and Count into a BigQuery table row. */
- static class FormatAsTableRowFn extends DoFn<KV<String, Long>, TableRow> {
+ /** A OldDoFn that converts a Word and Count into a BigQuery table row. */
+ static class FormatAsTableRowFn extends OldDoFn<KV<String, Long>, TableRow> {
@Override
public void processElement(ProcessContext c) {
TableRow row = new TableRow()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index af16c44..274d1ad 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -26,8 +26,8 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -97,10 +97,10 @@ public class WordCount {
/**
* Concept #2: You can make your pipeline code less verbose by defining your DoFns statically out-
- * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the
- * pipeline.
+ * of-line. This OldDoFn tokenizes lines of text into individual words; we pass it to a ParDo in
+ * the pipeline.
*/
- static class ExtractWordsFn extends DoFn<String, String> {
+ static class ExtractWordsFn extends OldDoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java b/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java
index 15eda06..0a93521 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java
@@ -24,8 +24,8 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PubsubOptions;
import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.IntraBundleParallelization;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.Transport;
import com.google.api.services.pubsub.Pubsub;
@@ -71,8 +71,8 @@ public class PubsubFileInjector {
}
}
- /** A DoFn that publishes non-empty lines to Google Cloud PubSub. */
- public static class Bound extends DoFn<String, Void> {
+ /** A OldDoFn that publishes non-empty lines to Google Cloud PubSub. */
+ public static class Bound extends OldDoFn<String, Void> {
private final String outputTopic;
private final String timestampLabelKey;
public transient Pubsub pubsub;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index c6272e8..7b44af8 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -36,9 +36,9 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Partition;
@@ -130,7 +130,7 @@ public class AutoComplete {
// Map the KV outputs of Count into our own CompletionCandiate class.
.apply("CreateCompletionCandidates", ParDo.of(
- new DoFn<KV<String, Long>, CompletionCandidate>() {
+ new OldDoFn<KV<String, Long>, CompletionCandidate>() {
@Override
public void processElement(ProcessContext c) {
c.output(new CompletionCandidate(c.element().getKey(), c.element().getValue()));
@@ -209,7 +209,7 @@ public class AutoComplete {
}
private static class FlattenTops
- extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
+ extends OldDoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
@Override
public void processElement(ProcessContext c) {
for (CompletionCandidate cc : c.element().getValue()) {
@@ -260,10 +260,10 @@ public class AutoComplete {
}
/**
- * A DoFn that keys each candidate by all its prefixes.
+ * A OldDoFn that keys each candidate by all its prefixes.
*/
private static class AllPrefixes
- extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
+ extends OldDoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
private final int minPrefix;
private final int maxPrefix;
public AllPrefixes(int minPrefix) {
@@ -341,7 +341,7 @@ public class AutoComplete {
/**
* Takes as input a set of strings, and emits each #hashtag found therein.
*/
- static class ExtractHashtags extends DoFn<String, String> {
+ static class ExtractHashtags extends OldDoFn<String, String> {
@Override
public void processElement(ProcessContext c) {
Matcher m = Pattern.compile("#\\S+").matcher(c.element());
@@ -351,7 +351,7 @@ public class AutoComplete {
}
}
- static class FormatForBigquery extends DoFn<KV<String, List<CompletionCandidate>>, TableRow> {
+ static class FormatForBigquery extends OldDoFn<KV<String, List<CompletionCandidate>>, TableRow> {
@Override
public void processElement(ProcessContext c) {
List<TableRow> completions = new ArrayList<>();
@@ -385,7 +385,7 @@ public class AutoComplete {
* Takes as input a the top candidates per prefix, and emits an entity
* suitable for writing to Datastore.
*/
- static class FormatForDatastore extends DoFn<KV<String, List<CompletionCandidate>>, Entity> {
+ static class FormatForDatastore extends OldDoFn<KV<String, List<CompletionCandidate>>, Entity> {
private String kind;
public FormatForDatastore(String kind) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
index db646a5..b0c9ffd 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import com.google.api.services.bigquery.model.TableFieldSchema;
@@ -55,8 +55,8 @@ import java.util.ArrayList;
*/
public class StreamingWordExtract {
- /** A DoFn that tokenizes lines of text into individual words. */
- static class ExtractWords extends DoFn<String, String> {
+ /** A OldDoFn that tokenizes lines of text into individual words. */
+ static class ExtractWords extends OldDoFn<String, String> {
@Override
public void processElement(ProcessContext c) {
String[] words = c.element().split("[^a-zA-Z']+");
@@ -68,8 +68,8 @@ public class StreamingWordExtract {
}
}
- /** A DoFn that uppercases a word. */
- static class Uppercase extends DoFn<String, String> {
+ /** A OldDoFn that uppercases a word. */
+ static class Uppercase extends OldDoFn<String, String> {
@Override
public void processElement(ProcessContext c) {
c.output(c.element().toUpperCase());
@@ -79,7 +79,7 @@ public class StreamingWordExtract {
/**
* Converts strings into BigQuery rows.
*/
- static class StringToRowConverter extends DoFn<String, TableRow> {
+ static class StringToRowConverter extends OldDoFn<String, TableRow> {
/**
* In this example, put the whole string into single BigQuery field.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 8305314..470a689 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -30,9 +30,9 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Keys;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
@@ -225,7 +225,7 @@ public class TfIdf {
// of the words in the document associated with that that URI.
PCollection<KV<URI, String>> uriToWords = uriToContent
.apply("SplitWords", ParDo.of(
- new DoFn<KV<URI, String>, KV<URI, String>>() {
+ new OldDoFn<KV<URI, String>, KV<URI, String>>() {
@Override
public void processElement(ProcessContext c) {
URI uri = c.element().getKey();
@@ -268,7 +268,7 @@ public class TfIdf {
// by the URI key.
PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
.apply("ShiftKeys", ParDo.of(
- new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
+ new OldDoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
@Override
public void processElement(ProcessContext c) {
URI uri = c.element().getKey().getKey();
@@ -307,7 +307,7 @@ public class TfIdf {
// divided by the total number of words in the document.
PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
.apply("ComputeTermFrequencies", ParDo.of(
- new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+ new OldDoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
@Override
public void processElement(ProcessContext c) {
URI uri = c.element().getKey();
@@ -328,11 +328,11 @@ public class TfIdf {
// documents in which the word appears divided by the total
// number of documents in the corpus. Note how the total number of
// documents is passed as a side input; the same value is
- // presented to each invocation of the DoFn.
+ // presented to each invocation of the OldDoFn.
PCollection<KV<String, Double>> wordToDf = wordToDocCount
.apply("ComputeDocFrequencies", ParDo
.withSideInputs(totalDocuments)
- .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
+ .of(new OldDoFn<KV<String, Long>, KV<String, Double>>() {
@Override
public void processElement(ProcessContext c) {
String word = c.element().getKey();
@@ -361,7 +361,7 @@ public class TfIdf {
// divided by the log of the document frequency.
PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = wordToUriAndTfAndDf
.apply("ComputeTfIdf", ParDo.of(
- new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+ new OldDoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
@Override
public void processElement(ProcessContext c) {
String word = c.element().getKey();
@@ -400,7 +400,7 @@ public class TfIdf {
@Override
public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
return wordToUriAndTfIdf
- .apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() {
+ .apply("Format", ParDo.of(new OldDoFn<KV<String, KV<URI, Double>>, String>() {
@Override
public void processElement(ProcessContext c) {
c.output(String.format("%s,\t%s,\t%f",
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
index f8af02a..0ed89d2 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -26,8 +26,8 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableComparator;
@@ -85,7 +85,7 @@ public class TopWikipediaSessions {
/**
* Extracts user and timestamp from a TableRow representing a Wikipedia edit.
*/
- static class ExtractUserAndTimestamp extends DoFn<TableRow, String> {
+ static class ExtractUserAndTimestamp extends OldDoFn<TableRow, String> {
@Override
public void processElement(ProcessContext c) {
TableRow row = c.element();
@@ -132,7 +132,7 @@ public class TopWikipediaSessions {
}
}
- static class SessionsToStringsDoFn extends DoFn<KV<String, Long>, KV<String, Long>>
+ static class SessionsToStringsDoFn extends OldDoFn<KV<String, Long>, KV<String, Long>>
implements RequiresWindowAccess {
@Override
@@ -142,7 +142,7 @@ public class TopWikipediaSessions {
}
}
- static class FormatOutputDoFn extends DoFn<List<KV<String, Long>>, String>
+ static class FormatOutputDoFn extends OldDoFn<List<KV<String, Long>>, String>
implements RequiresWindowAccess {
@Override
public void processElement(ProcessContext c) {
@@ -168,7 +168,7 @@ public class TopWikipediaSessions {
.apply(ParDo.of(new ExtractUserAndTimestamp()))
.apply("SampleUsers", ParDo.of(
- new DoFn<String, String>() {
+ new OldDoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
if (Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * samplingThreshold) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
index 7b1496f..9122015 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -145,12 +145,12 @@ public class TrafficMaxLaneFlow {
/**
* Extract the timestamp field from the input string, and use it as the element timestamp.
*/
- static class ExtractTimestamps extends DoFn<String, String> {
+ static class ExtractTimestamps extends OldDoFn<String, String> {
private static final DateTimeFormatter dateTimeFormat =
DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss");
@Override
- public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
+ public void processElement(OldDoFn<String, String>.ProcessContext c) throws Exception {
String[] items = c.element().split(",");
if (items.length > 0) {
try {
@@ -170,7 +170,7 @@ public class TrafficMaxLaneFlow {
* information. The number of lanes for which data is present depends upon which freeway the data
* point comes from.
*/
- static class ExtractFlowInfoFn extends DoFn<String, KV<String, LaneInfo>> {
+ static class ExtractFlowInfoFn extends OldDoFn<String, KV<String, LaneInfo>> {
@Override
public void processElement(ProcessContext c) {
@@ -226,7 +226,7 @@ public class TrafficMaxLaneFlow {
* Format the results of the Max Lane flow calculation to a TableRow, to save to BigQuery.
* Add the timestamp from the window context.
*/
- static class FormatMaxesFn extends DoFn<KV<String, LaneInfo>, TableRow> {
+ static class FormatMaxesFn extends OldDoFn<KV<String, LaneInfo>, TableRow> {
@Override
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
index ebf7b9a..30091b6 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
@@ -29,8 +29,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
@@ -149,12 +149,12 @@ public class TrafficRoutes {
/**
* Extract the timestamp field from the input string, and use it as the element timestamp.
*/
- static class ExtractTimestamps extends DoFn<String, String> {
+ static class ExtractTimestamps extends OldDoFn<String, String> {
private static final DateTimeFormatter dateTimeFormat =
DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss");
@Override
- public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
+ public void processElement(OldDoFn<String, String>.ProcessContext c) throws Exception {
String[] items = c.element().split(",");
String timestamp = tryParseTimestamp(items);
if (timestamp != null) {
@@ -171,7 +171,7 @@ public class TrafficRoutes {
* Filter out readings for the stations along predefined 'routes', and output
* (station, speed info) keyed on route.
*/
- static class ExtractStationSpeedFn extends DoFn<String, KV<String, StationSpeed>> {
+ static class ExtractStationSpeedFn extends OldDoFn<String, KV<String, StationSpeed>> {
@Override
public void processElement(ProcessContext c) {
@@ -200,7 +200,7 @@ public class TrafficRoutes {
* Note: these calculations are for example purposes only, and are unrealistic and oversimplified.
*/
static class GatherStats
- extends DoFn<KV<String, Iterable<StationSpeed>>, KV<String, RouteInfo>> {
+ extends OldDoFn<KV<String, Iterable<StationSpeed>>, KV<String, RouteInfo>> {
@Override
public void processElement(ProcessContext c) throws IOException {
String route = c.element().getKey();
@@ -243,7 +243,7 @@ public class TrafficRoutes {
/**
* Format the results of the slowdown calculations to a TableRow, to save to BigQuery.
*/
- static class FormatStatsFn extends DoFn<KV<String, RouteInfo>, TableRow> {
+ static class FormatStatsFn extends OldDoFn<KV<String, RouteInfo>, TableRow> {
@Override
public void processElement(ProcessContext c) {
RouteInfo routeInfo = c.element().getValue();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
index 665be01..6002b11 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
@@ -25,7 +25,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
@@ -81,7 +81,7 @@ public class BigQueryTornadoes {
* Examines each row in the input table. If a tornado was recorded
* in that sample, the month in which it occurred is output.
*/
- static class ExtractTornadoesFn extends DoFn<TableRow, Integer> {
+ static class ExtractTornadoesFn extends OldDoFn<TableRow, Integer> {
@Override
public void processElement(ProcessContext c){
TableRow row = c.element();
@@ -95,7 +95,7 @@ public class BigQueryTornadoes {
* Prepares the data for writing to BigQuery by building a TableRow object containing an
* integer representation of month and the number of tornadoes that occurred in each month.
*/
- static class FormatCountsFn extends DoFn<KV<Integer, Long>, TableRow> {
+ static class FormatCountsFn extends OldDoFn<KV<Integer, Long>, TableRow> {
@Override
public void processElement(ProcessContext c) {
TableRow row = new TableRow()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index 252f3cc..d0bce5d 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -90,7 +90,7 @@ public class CombinePerKeyExamples {
* Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH,
* outputs word, play_name.
*/
- static class ExtractLargeWordsFn extends DoFn<TableRow, KV<String, String>> {
+ static class ExtractLargeWordsFn extends OldDoFn<TableRow, KV<String, String>> {
private final Aggregator<Long, Long> smallerWords =
createAggregator("smallerWords", new Sum.SumLongFn());
@@ -114,7 +114,7 @@ public class CombinePerKeyExamples {
* Prepares the data for writing to BigQuery by building a TableRow object
* containing a word with a string listing the plays in which it appeared.
*/
- static class FormatShakespeareOutputFn extends DoFn<KV<String, String>, TableRow> {
+ static class FormatShakespeareOutputFn extends OldDoFn<KV<String, String>, TableRow> {
@Override
public void processElement(ProcessContext c) {
TableRow row = new TableRow()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
index 847523b..1850e89 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
@@ -32,8 +32,8 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import com.google.datastore.v1beta3.Entity;
@@ -44,7 +44,6 @@ import com.google.datastore.v1beta3.Value;
import java.util.Map;
import java.util.UUID;
-
import javax.annotation.Nullable;
/**
@@ -80,10 +79,10 @@ import javax.annotation.Nullable;
public class DatastoreWordCount {
/**
- * A DoFn that gets the content of an entity (one line in a
+ * A OldDoFn that gets the content of an entity (one line in a
* Shakespeare play) and converts it to a string.
*/
- static class GetContentFn extends DoFn<Entity, String> {
+ static class GetContentFn extends OldDoFn<Entity, String> {
@Override
public void processElement(ProcessContext c) {
Map<String, Value> props = c.element().getProperties();
@@ -109,9 +108,9 @@ public class DatastoreWordCount {
}
/**
- * A DoFn that creates entity for every line in Shakespeare.
+ * A OldDoFn that creates entity for every line in Shakespeare.
*/
- static class CreateEntityFn extends DoFn<String, Entity> {
+ static class CreateEntityFn extends OldDoFn<String, Entity> {
private final String namespace;
private final String kind;
private final Key ancestorKey;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
index ea1dcf6..06fba77 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
@@ -24,8 +24,8 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Mean;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
@@ -98,7 +98,7 @@ public class FilterExamples {
* Examines each row in the input table. Outputs only the subset of the cells this example
* is interested in-- the mean_temp and year, month, and day-- as a bigquery table row.
*/
- static class ProjectionFn extends DoFn<TableRow, TableRow> {
+ static class ProjectionFn extends OldDoFn<TableRow, TableRow> {
@Override
public void processElement(ProcessContext c){
TableRow row = c.element();
@@ -119,9 +119,9 @@ public class FilterExamples {
* Implements 'filter' functionality.
*
* <p>Examines each row in the input table. Outputs only rows from the month
- * monthFilter, which is passed in as a parameter during construction of this DoFn.
+ * monthFilter, which is passed in as a parameter during construction of this OldDoFn.
*/
- static class FilterSingleMonthDataFn extends DoFn<TableRow, TableRow> {
+ static class FilterSingleMonthDataFn extends OldDoFn<TableRow, TableRow> {
Integer monthFilter;
public FilterSingleMonthDataFn(Integer monthFilter) {
@@ -143,7 +143,7 @@ public class FilterExamples {
* Examines each row (weather reading) in the input table. Output the temperature
* reading for that row ('mean_temp').
*/
- static class ExtractTempFn extends DoFn<TableRow, Double> {
+ static class ExtractTempFn extends OldDoFn<TableRow, Double> {
@Override
public void processElement(ProcessContext c){
TableRow row = c.element();
@@ -191,7 +191,7 @@ public class FilterExamples {
PCollection<TableRow> filteredRows = monthFilteredRows
.apply("ParseAndFilter", ParDo
.withSideInputs(globalMeanTemp)
- .of(new DoFn<TableRow, TableRow>() {
+ .of(new OldDoFn<TableRow, TableRow>() {
@Override
public void processElement(ProcessContext c) {
Double meanTemp = Double.parseDouble(c.element().get("mean_temp").toString());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
index 1b43cc2..5260c0d 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
@@ -99,7 +99,7 @@ public class JoinExamples {
// country code 'key' -> string of <event info>, <country name>
PCollection<KV<String, String>> finalResultCollection =
kvpCollection.apply("Process", ParDo.of(
- new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
+ new OldDoFn<KV<String, CoGbkResult>, KV<String, String>>() {
@Override
public void processElement(ProcessContext c) {
KV<String, CoGbkResult> e = c.element();
@@ -116,7 +116,7 @@ public class JoinExamples {
// write to GCS
PCollection<String> formattedResults = finalResultCollection
- .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() {
+ .apply("Format", ParDo.of(new OldDoFn<KV<String, String>, String>() {
@Override
public void processElement(ProcessContext c) {
String outputstring = "Country code: " + c.element().getKey()
@@ -131,7 +131,7 @@ public class JoinExamples {
* Examines each row (event) in the input table. Output a KV with the key the country
* code of the event, and the value a string encoding event information.
*/
- static class ExtractEventDataFn extends DoFn<TableRow, KV<String, String>> {
+ static class ExtractEventDataFn extends OldDoFn<TableRow, KV<String, String>> {
@Override
public void processElement(ProcessContext c) {
TableRow row = c.element();
@@ -149,7 +149,7 @@ public class JoinExamples {
* Examines each row (country info) in the input table. Output a KV with the key the country
* code, and the value the country name.
*/
- static class ExtractCountryInfoFn extends DoFn<TableRow, KV<String, String>> {
+ static class ExtractCountryInfoFn extends OldDoFn<TableRow, KV<String, String>> {
@Override
public void processElement(ProcessContext c) {
TableRow row = c.element();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
index a37690b..1bcb491 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
@@ -24,8 +24,8 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
@@ -82,7 +82,7 @@ public class MaxPerKeyExamples {
* Examines each row (weather reading) in the input table. Output the month of the reading,
* and the mean_temp.
*/
- static class ExtractTempFn extends DoFn<TableRow, KV<Integer, Double>> {
+ static class ExtractTempFn extends OldDoFn<TableRow, KV<Integer, Double>> {
@Override
public void processElement(ProcessContext c) {
TableRow row = c.element();
@@ -96,7 +96,7 @@ public class MaxPerKeyExamples {
* Format the results to a TableRow, to save to BigQuery.
*
*/
- static class FormatMaxesFn extends DoFn<KV<Integer, Double>, TableRow> {
+ static class FormatMaxesFn extends OldDoFn<KV<Integer, Double>, TableRow> {
@Override
public void processElement(ProcessContext c) {
TableRow row = new TableRow()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index a0c5181..0be9921 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -28,9 +28,9 @@ import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterEach;
@@ -342,7 +342,7 @@ public class TriggerExample {
.apply(GroupByKey.<String, Integer>create());
PCollection<KV<String, String>> results = flowPerFreeway.apply(ParDo.of(
- new DoFn <KV<String, Iterable<Integer>>, KV<String, String>>() {
+ new OldDoFn<KV<String, Iterable<Integer>>, KV<String, String>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
@@ -365,7 +365,7 @@ public class TriggerExample {
* Format the results of the Total flow calculation to a TableRow, to save to BigQuery.
* Adds the triggerType, pane information, processing time and the window timestamp.
* */
- static class FormatTotalFlow extends DoFn<KV<String, String>, TableRow>
+ static class FormatTotalFlow extends OldDoFn<KV<String, String>, TableRow>
implements RequiresWindowAccess {
private String triggerType;
@@ -394,7 +394,7 @@ public class TriggerExample {
* Extract the freeway and total flow in a reading.
* Freeway is used as key since we are calculating the total flow for each freeway.
*/
- static class ExtractFlowInfo extends DoFn<String, KV<String, Integer>> {
+ static class ExtractFlowInfo extends OldDoFn<String, KV<String, Integer>> {
@Override
public void processElement(ProcessContext c) throws Exception {
String[] laneInfo = c.element().split(",");
@@ -471,7 +471,7 @@ public class TriggerExample {
* Add current time to each record.
* Also insert a delay at random to demo the triggers.
*/
- public static class InsertDelays extends DoFn<String, String> {
+ public static class InsertDelays extends OldDoFn<String, String> {
private static final double THRESHOLD = 0.001;
// MIN_DELAY and MAX_DELAY in minutes.
private static final int MIN_DELAY = 1;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java
index ff117dc..26bf8fb 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java
@@ -46,7 +46,7 @@ import java.util.List;
@RunWith(JUnit4.class)
public class WordCountTest {
- /** Example test that tests a specific DoFn. */
+ /** Example test that tests a specific OldDoFn. */
@Test
public void testExtractWordsFn() throws Exception {
DoFnTester<String, String> extractWordsFn =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
index b2ed9a2..6f68ce8 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
@@ -23,8 +23,8 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -171,7 +171,7 @@ public class AutoCompleteTest implements Serializable {
extends PTransform<PCollection<TimestampedValue<T>>, PCollection<T>> {
@Override
public PCollection<T> apply(PCollection<TimestampedValue<T>> input) {
- return input.apply(ParDo.of(new DoFn<TimestampedValue<T>, T>() {
+ return input.apply(ParDo.of(new OldDoFn<TimestampedValue<T>, T>() {
@Override
public void processElement(ProcessContext c) {
c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
index 6f58389..e72a9e8 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
@@ -24,8 +24,8 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -141,7 +141,7 @@ public class TriggerExampleTest {
return Joiner.on(",").join(entries);
}
- static class FormatResults extends DoFn<TableRow, String> {
+ static class FormatResults extends OldDoFn<TableRow, String> {
@Override
public void processElement(ProcessContext c) throws Exception {
TableRow element = c.element();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index 33b8727..b1407f6 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -27,10 +27,10 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.Mean;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
@@ -126,7 +126,7 @@ public class GameStats extends LeaderBoard {
.apply("ProcessAndFilter", ParDo
// use the derived mean total score as a side input
.withSideInputs(globalMeanScore)
- .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
+ .of(new OldDoFn<KV<String, Integer>, KV<String, Integer>>() {
private final Aggregator<Long, Long> numSpammerUsers =
createAggregator("SpammerUsers", new Sum.SumLongFn());
@Override
@@ -149,7 +149,7 @@ public class GameStats extends LeaderBoard {
/**
* Calculate and output an element's session duration.
*/
- private static class UserSessionInfoFn extends DoFn<KV<String, Integer>, Integer>
+ private static class UserSessionInfoFn extends OldDoFn<KV<String, Integer>, Integer>
implements RequiresWindowAccess {
@Override
@@ -281,7 +281,7 @@ public class GameStats extends LeaderBoard {
// Filter out the detected spammer users, using the side input derived above.
.apply("FilterOutSpammers", ParDo
.withSideInputs(spammersView)
- .of(new DoFn<GameActionInfo, GameActionInfo>() {
+ .of(new OldDoFn<GameActionInfo, GameActionInfo>() {
@Override
public void processElement(ProcessContext c) {
// If the user is not in the spammers Map, output the data element.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index 28614cb..00dc8a4 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -28,8 +28,8 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
@@ -123,7 +123,7 @@ public class UserScore {
* user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
* The human-readable time string is not used here.
*/
- static class ParseEventFn extends DoFn<String, GameActionInfo> {
+ static class ParseEventFn extends OldDoFn<String, GameActionInfo> {
// Log and count parse errors.
private static final Logger LOG = LoggerFactory.getLogger(ParseEventFn.class);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
index 36ed195..6af6e15 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -66,10 +66,10 @@ public class WriteToBigQuery<T>
// The BigQuery 'type' of the field
private String fieldType;
// A lambda function to generate the field value
- private SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fieldFn;
+ private SerializableFunction<OldDoFn<T, TableRow>.ProcessContext, Object> fieldFn;
public FieldInfo(String fieldType,
- SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fieldFn) {
+ SerializableFunction<OldDoFn<T, TableRow>.ProcessContext, Object> fieldFn) {
this.fieldType = fieldType;
this.fieldFn = fieldFn;
}
@@ -78,12 +78,12 @@ public class WriteToBigQuery<T>
return this.fieldType;
}
- SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> getFieldFn() {
+ SerializableFunction<OldDoFn<T, TableRow>.ProcessContext, Object> getFieldFn() {
return this.fieldFn;
}
}
/** Convert each key/score pair into a BigQuery TableRow as specified by fieldFn. */
- protected class BuildRowFn extends DoFn<T, TableRow> {
+ protected class BuildRowFn extends OldDoFn<T, TableRow> {
@Override
public void processElement(ProcessContext c) {
@@ -92,7 +92,7 @@ public class WriteToBigQuery<T>
for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) {
String key = entry.getKey();
FieldInfo<T> fcnInfo = entry.getValue();
- SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fcn =
+ SerializableFunction<OldDoFn<T, TableRow>.ProcessContext, Object> fcn =
fcnInfo.getFieldFn();
row.set(key, fcn.apply(c));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
index b4c9b4a..c59fd61 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
@@ -20,8 +20,8 @@ package org.apache.beam.examples.complete.game.utils;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
@@ -45,7 +45,7 @@ public class WriteWindowedToBigQuery<T>
}
/** Convert each key/score pair into a BigQuery TableRow. */
- protected class BuildRowFn extends DoFn<T, TableRow>
+ protected class BuildRowFn extends OldDoFn<T, TableRow>
implements RequiresWindowAccess {
@Override
@@ -55,7 +55,7 @@ public class WriteWindowedToBigQuery<T>
for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) {
String key = entry.getKey();
FieldInfo<T> fcnInfo = entry.getValue();
- SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fcn =
+ SerializableFunction<OldDoFn<T, TableRow>.ProcessContext, Object> fcn =
fcnInfo.getFieldFn();
row.set(key, fcn.apply(c));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
index cc3e7fa..01efad8 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
@@ -83,7 +83,7 @@ public class UserScoreTest implements Serializable {
KV.of("AndroidGreenKookaburra", 23),
KV.of("BisqueBilby", 14));
- /** Test the ParseEventFn DoFn. */
+ /** Test the ParseEventFn OldDoFn. */
@Test
public void testParseEventFn() throws Exception {
DoFnTester<String, GameActionInfo> parseEventFn =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 0d320bc..7cdab00 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -18,7 +18,7 @@
package org.apache.beam.runners.core;
import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.DoFnRunner.ReduceFnExecutor;
@@ -41,10 +41,10 @@ import org.apache.beam.sdk.values.KV;
@SystemDoFnInternal
public class GroupAlsoByWindowViaWindowSetDoFn<
K, InputT, OutputT, W extends BoundedWindow, RinT extends KeyedWorkItem<K, InputT>>
- extends DoFn<RinT, KV<K, OutputT>> implements ReduceFnExecutor<K, InputT, OutputT, W> {
+ extends OldDoFn<RinT, KV<K, OutputT>> implements ReduceFnExecutor<K, InputT, OutputT, W> {
public static <K, InputT, OutputT, W extends BoundedWindow>
- DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
+ OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
WindowingStrategy<?, W> strategy,
StateInternalsFactory<K> stateInternalsFactory,
SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
@@ -99,11 +99,11 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
}
@Override
- public DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() {
+ public OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() {
// Safe contravariant cast
@SuppressWarnings("unchecked")
- DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asFn =
- (DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>>) this;
+ OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asFn =
+ (OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>>) this;
return asFn;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
index 5821e73..3ce0c06 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.core;
import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName;
+
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -47,7 +48,6 @@ import com.google.common.collect.Lists;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
index d40b007..739db45 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
@@ -19,8 +19,8 @@ package org.apache.beam.sdk.util;
import static com.google.common.base.Preconditions.checkNotNull;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -32,14 +32,14 @@ import org.joda.time.Instant;
import java.util.Collection;
/**
- * {@link DoFn} that tags elements of a {@link PCollection} with windows, according to the provided
- * {@link WindowFn}.
+ * {@link OldDoFn} that tags elements of a {@link PCollection} with windows, according to the
+ * provided {@link WindowFn}.
*
* @param <T> Type of elements being windowed
* @param <W> Window type
*/
@SystemDoFnInternal
-public class AssignWindowsDoFn<T, W extends BoundedWindow> extends DoFn<T, T>
+public class AssignWindowsDoFn<T, W extends BoundedWindow> extends OldDoFn<T, T>
implements RequiresWindowAccess {
private WindowFn<? super T, W> fn;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java
index 4ec8920..49206d1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java
@@ -18,41 +18,42 @@
package org.apache.beam.sdk.util;
import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.ProcessContext;
import org.apache.beam.sdk.values.KV;
/**
- * An wrapper interface that represents the execution of a {@link DoFn}.
+ * An wrapper interface that represents the execution of a {@link OldDoFn}.
*/
public interface DoFnRunner<InputT, OutputT> {
/**
- * Prepares and calls {@link DoFn#startBundle}.
+ * Prepares and calls {@link OldDoFn#startBundle}.
*/
public void startBundle();
/**
- * Calls {@link DoFn#processElement} with a {@link ProcessContext} containing the current element.
+ * Calls {@link OldDoFn#processElement} with a {@link ProcessContext} containing the current
+ * element.
*/
public void processElement(WindowedValue<InputT> elem);
/**
- * Calls {@link DoFn#finishBundle} and performs additional tasks, such as
+ * Calls {@link OldDoFn#finishBundle} and performs additional tasks, such as
* flushing in-memory states.
*/
public void finishBundle();
/**
- * An internal interface for signaling that a {@link DoFn} requires late data dropping.
+ * An internal interface for signaling that a {@link OldDoFn} requires late data dropping.
*/
public interface ReduceFnExecutor<K, InputT, OutputT, W> {
/**
- * Gets this object as a {@link DoFn}.
+ * Gets this object as a {@link OldDoFn}.
*
- * Most implementors of this interface are expected to be {@link DoFn} instances, and will
+ * Most implementors of this interface are expected to be {@link OldDoFn} instances, and will
* return themselves.
*/
- DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn();
+ OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn();
/**
* Returns an aggregator that tracks elements that are dropped due to being late.