You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/06/24 06:21:18 UTC
[1/3] incubator-beam git commit: Remove ParDo.named on all variants
Repository: incubator-beam
Updated Branches:
refs/heads/master 7745b921f -> 41faee4f9
Remove ParDo.named on all variants
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4f050bf5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4f050bf5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4f050bf5
Branch: refs/heads/master
Commit: 4f050bf56832b1cdeda7af1a58589a1ed952a41a
Parents: d7f2810
Author: Ben Chambers <bc...@google.com>
Authored: Thu Jun 23 09:57:55 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Thu Jun 23 22:55:43 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/ParDo.java | 70 +-------------------
1 file changed, 3 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f050bf5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index cb7d372..16dfcac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -123,16 +123,13 @@ import java.util.List;
* a unique name - which may not be stable across pipeline revision -
* will be generated, based on the transform name.
*
- * <p>If a {@link ParDo} is applied exactly once inlined, then
- * it can be given a name via {@link #named}. For example:
+ * <p>For example:
*
* <pre> {@code
* PCollection<String> words =
- * lines.apply(ParDo.named("ExtractWords")
- * .of(new DoFn<String, String>() { ... }));
+ * lines.apply("ExtractWords", ParDo.of(new DoFn<String, String>() { ... }));
* PCollection<Integer> wordLengths =
- * words.apply(ParDo.named("ComputeWordLengths")
- * .of(new DoFn<String, Integer>() { ... }));
+ * words.apply("ComputeWordLengths", ParDo.of(new DoFn<String, Integer>() { ... }));
* } </pre>
*
* <h2>Side Inputs</h2>
@@ -437,21 +434,6 @@ import java.util.List;
public class ParDo {
/**
- * Creates a {@link ParDo} {@link PTransform} with the given name.
- *
- * <p>See the discussion of naming above for more explanation.
- *
- * <p>The resulting {@link PTransform} is incomplete, and its
- * input/output types are not yet bound. Use
- * {@link ParDo.Unbound#of} to specify the {@link DoFn} to
- * invoke, which will also bind the input/output types of this
- * {@link PTransform}.
- */
- public static Unbound named(String name) {
- return new Unbound().named(name);
- }
-
- /**
* Creates a {@link ParDo} {@link PTransform} with the given
* side inputs.
*
@@ -588,17 +570,6 @@ public class ParDo {
/**
* Returns a new {@link ParDo} transform that's like this
- * transform but with the specified name. Does not modify this
- * transform. The resulting transform is still incomplete.
- *
- * <p>See the discussion of naming above for more explanation.
- */
- public Unbound named(String name) {
- return new Unbound(name, sideInputs);
- }
-
- /**
- * Returns a new {@link ParDo} transform that's like this
* transform but with the specified additional side inputs.
* Does not modify this transform. The resulting transform is
* still incomplete.
@@ -703,17 +674,6 @@ public class ParDo {
/**
* Returns a new {@link ParDo} {@link PTransform} that's like this
- * {@link PTransform} but with the specified name. Does not
- * modify this {@link PTransform}.
- *
- * <p>See the discussion of Naming above for more explanation.
- */
- public Bound<InputT, OutputT> named(String name) {
- return new Bound<>(name, sideInputs, fn, fnClass);
- }
-
- /**
- * Returns a new {@link ParDo} {@link PTransform} that's like this
* {@link PTransform} but with the specified additional side inputs. Does not
* modify this {@link PTransform}.
*
@@ -833,18 +793,6 @@ public class ParDo {
/**
* Returns a new multi-output {@link ParDo} transform that's like
- * this transform but with the specified name. Does not modify
- * this transform. The resulting transform is still incomplete.
- *
- * <p>See the discussion of Naming above for more explanation.
- */
- public UnboundMulti<OutputT> named(String name) {
- return new UnboundMulti<>(
- name, sideInputs, mainOutputTag, sideOutputTags);
- }
-
- /**
- * Returns a new multi-output {@link ParDo} transform that's like
* this transform but with the specified side inputs. Does not
* modify this transform. The resulting transform is still
* incomplete.
@@ -942,18 +890,6 @@ public class ParDo {
/**
* Returns a new multi-output {@link ParDo} {@link PTransform}
- * that's like this {@link PTransform} but with the specified
- * name. Does not modify this {@link PTransform}.
- *
- * <p>See the discussion of Naming above for more explanation.
- */
- public BoundMulti<InputT, OutputT> named(String name) {
- return new BoundMulti<>(
- name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass);
- }
-
- /**
- * Returns a new multi-output {@link ParDo} {@link PTransform}
* that's like this {@link PTransform} but with the specified additional side
* inputs. Does not modify this {@link PTransform}.
*
[3/3] incubator-beam git commit: This closes #522
Posted by bc...@apache.org.
This closes #522
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41faee4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41faee4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41faee4f
Branch: refs/heads/master
Commit: 41faee4f9cbe29ad6a80742789a0004abe59e684
Parents: 7745b92 4f050bf
Author: bchambers <bc...@google.com>
Authored: Thu Jun 23 22:55:46 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Thu Jun 23 22:55:46 2016 -0700
----------------------------------------------------------------------
.../apache/beam/examples/MinimalWordCount.java | 2 +-
.../beam/examples/complete/AutoComplete.java | 4 +-
.../apache/beam/examples/complete/TfIdf.java | 13 ++--
.../examples/complete/TopWikipediaSessions.java | 7 +-
.../beam/examples/cookbook/FilterExamples.java | 3 +-
.../beam/examples/cookbook/JoinExamples.java | 4 +-
.../beam/examples/cookbook/TriggerExample.java | 2 +-
.../beam/examples/complete/game/GameStats.java | 7 +-
.../examples/complete/game/HourlyTeamScore.java | 4 +-
.../examples/complete/game/LeaderBoard.java | 4 +-
.../beam/examples/complete/game/UserScore.java | 2 +-
.../complete/game/utils/WriteToBigQuery.java | 2 +-
.../game/utils/WriteWindowedToBigQuery.java | 2 +-
.../complete/game/HourlyTeamScoreTest.java | 2 +-
.../beam/runners/flink/examples/TFIDF.java | 48 +++++++-------
.../flink/examples/streaming/AutoComplete.java | 4 +-
.../flink/examples/streaming/JoinExamples.java | 4 +-
.../beam/runners/dataflow/DataflowRunner.java | 5 +-
.../dataflow/internal/AssignWindows.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 6 +-
.../sdk/io/BoundedReadFromUnboundedSource.java | 3 +-
.../apache/beam/sdk/io/PubsubUnboundedSink.java | 24 +++----
.../beam/sdk/io/PubsubUnboundedSource.java | 5 +-
.../org/apache/beam/sdk/testing/PAssert.java | 6 +-
.../org/apache/beam/sdk/transforms/Combine.java | 48 +++++++-------
.../org/apache/beam/sdk/transforms/Count.java | 2 +-
.../beam/sdk/transforms/FlatMapElements.java | 2 +-
.../org/apache/beam/sdk/transforms/Flatten.java | 2 +-
.../org/apache/beam/sdk/transforms/Keys.java | 13 ++--
.../org/apache/beam/sdk/transforms/KvSwap.java | 15 ++---
.../apache/beam/sdk/transforms/MapElements.java | 2 +-
.../apache/beam/sdk/transforms/PTransform.java | 7 +-
.../org/apache/beam/sdk/transforms/ParDo.java | 70 +-------------------
.../beam/sdk/transforms/RemoveDuplicates.java | 13 ++--
.../org/apache/beam/sdk/transforms/Values.java | 13 ++--
.../apache/beam/sdk/transforms/WithKeys.java | 15 ++---
.../beam/sdk/transforms/WithTimestamps.java | 2 +-
.../beam/sdk/transforms/join/CoGroupByKey.java | 10 ++-
.../beam/sdk/transforms/windowing/Window.java | 21 +++---
.../org/apache/beam/sdk/util/Reshuffle.java | 3 +-
.../apache/beam/sdk/util/ValueWithRecordId.java | 19 ++----
.../apache/beam/sdk/transforms/ParDoTest.java | 27 ++------
.../sdk/transforms/windowing/WindowingTest.java | 8 +--
.../src/main/java/MinimalWordCount.java | 4 +-
44 files changed, 180 insertions(+), 281 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-beam git commit: Remove uses of ParDo.named
Posted by bc...@apache.org.
Remove uses of ParDo.named
Also removed a few other uses of .named() methods that were nearby
use-sites being cleaned up.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d7f2810a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d7f2810a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d7f2810a
Branch: refs/heads/master
Commit: d7f2810ad24e805793d6d43a639a48845aac4e48
Parents: 7745b92
Author: Ben Chambers <bc...@google.com>
Authored: Thu Jun 23 09:53:40 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Thu Jun 23 22:55:43 2016 -0700
----------------------------------------------------------------------
.../apache/beam/examples/MinimalWordCount.java | 2 +-
.../beam/examples/complete/AutoComplete.java | 4 +-
.../apache/beam/examples/complete/TfIdf.java | 13 +++---
.../examples/complete/TopWikipediaSessions.java | 7 ++-
.../beam/examples/cookbook/FilterExamples.java | 3 +-
.../beam/examples/cookbook/JoinExamples.java | 4 +-
.../beam/examples/cookbook/TriggerExample.java | 2 +-
.../beam/examples/complete/game/GameStats.java | 7 ++-
.../examples/complete/game/HourlyTeamScore.java | 4 +-
.../examples/complete/game/LeaderBoard.java | 4 +-
.../beam/examples/complete/game/UserScore.java | 2 +-
.../complete/game/utils/WriteToBigQuery.java | 2 +-
.../game/utils/WriteWindowedToBigQuery.java | 2 +-
.../complete/game/HourlyTeamScoreTest.java | 2 +-
.../beam/runners/flink/examples/TFIDF.java | 48 ++++++++++----------
.../flink/examples/streaming/AutoComplete.java | 4 +-
.../flink/examples/streaming/JoinExamples.java | 4 +-
.../beam/runners/dataflow/DataflowRunner.java | 5 +-
.../dataflow/internal/AssignWindows.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 6 +--
.../sdk/io/BoundedReadFromUnboundedSource.java | 3 +-
.../apache/beam/sdk/io/PubsubUnboundedSink.java | 24 +++++-----
.../beam/sdk/io/PubsubUnboundedSource.java | 5 +-
.../org/apache/beam/sdk/testing/PAssert.java | 6 +--
.../org/apache/beam/sdk/transforms/Combine.java | 48 ++++++++++----------
.../org/apache/beam/sdk/transforms/Count.java | 2 +-
.../beam/sdk/transforms/FlatMapElements.java | 2 +-
.../org/apache/beam/sdk/transforms/Flatten.java | 2 +-
.../org/apache/beam/sdk/transforms/Keys.java | 13 +++---
.../org/apache/beam/sdk/transforms/KvSwap.java | 15 +++---
.../apache/beam/sdk/transforms/MapElements.java | 2 +-
.../apache/beam/sdk/transforms/PTransform.java | 7 ++-
.../beam/sdk/transforms/RemoveDuplicates.java | 13 +++---
.../org/apache/beam/sdk/transforms/Values.java | 13 +++---
.../apache/beam/sdk/transforms/WithKeys.java | 15 +++---
.../beam/sdk/transforms/WithTimestamps.java | 2 +-
.../beam/sdk/transforms/join/CoGroupByKey.java | 10 ++--
.../beam/sdk/transforms/windowing/Window.java | 21 ++++-----
.../org/apache/beam/sdk/util/Reshuffle.java | 3 +-
.../apache/beam/sdk/util/ValueWithRecordId.java | 19 +++-----
.../apache/beam/sdk/transforms/ParDoTest.java | 27 +++--------
.../sdk/transforms/windowing/WindowingTest.java | 8 ++--
.../src/main/java/MinimalWordCount.java | 4 +-
43 files changed, 177 insertions(+), 214 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
index 355a1ff..2c67609 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
@@ -92,7 +92,7 @@ public class MinimalWordCount {
// DoFn (defined in-line) on each element that tokenizes the text line into individual words.
// The ParDo returns a PCollection<String>, where each element is an individual word in
// Shakespeare's collected texts.
- .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
+ .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
for (String word : c.element().split("[^a-zA-Z']+")) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index c6893f4..2732aa5 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -130,7 +130,7 @@ public class AutoComplete {
.apply(new Count.PerElement<String>())
// Map the KV outputs of Count into our own CompletionCandiate class.
- .apply(ParDo.named("CreateCompletionCandidates").of(
+ .apply("CreateCompletionCandidates", ParDo.of(
new DoFn<KV<String, Long>, CompletionCandidate>() {
@Override
public void processElement(ProcessContext c) {
@@ -481,7 +481,7 @@ public class AutoComplete {
if (options.getOutputToDatastore()) {
toWrite
- .apply(ParDo.named("FormatForDatastore").of(new FormatForDatastore(options.getKind())))
+ .apply("FormatForDatastore", ParDo.of(new FormatForDatastore(options.getKind())))
.apply(DatastoreIO.writeTo(MoreObjects.firstNonNull(
options.getOutputProject(), options.getProject())));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 73f3323..23653d6 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -225,7 +225,7 @@ public class TfIdf {
// Create a collection of pairs mapping a URI to each
// of the words in the document associated with that that URI.
PCollection<KV<URI, String>> uriToWords = uriToContent
- .apply(ParDo.named("SplitWords").of(
+ .apply("SplitWords", ParDo.of(
new DoFn<KV<URI, String>, KV<URI, String>>() {
@Override
public void processElement(ProcessContext c) {
@@ -268,7 +268,7 @@ public class TfIdf {
// from URI to (word, count) pairs, to prepare for a join
// by the URI key.
PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
- .apply(ParDo.named("ShiftKeys").of(
+ .apply("ShiftKeys", ParDo.of(
new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
@Override
public void processElement(ProcessContext c) {
@@ -307,7 +307,7 @@ public class TfIdf {
// is simply the number of times that word occurs in the document
// divided by the total number of words in the document.
PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
- .apply(ParDo.named("ComputeTermFrequencies").of(
+ .apply("ComputeTermFrequencies", ParDo.of(
new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
@Override
public void processElement(ProcessContext c) {
@@ -331,8 +331,7 @@ public class TfIdf {
// documents is passed as a side input; the same value is
// presented to each invocation of the DoFn.
PCollection<KV<String, Double>> wordToDf = wordToDocCount
- .apply(ParDo
- .named("ComputeDocFrequencies")
+ .apply("ComputeDocFrequencies", ParDo
.withSideInputs(totalDocuments)
.of(new DoFn<KV<String, Long>, KV<String, Double>>() {
@Override
@@ -362,7 +361,7 @@ public class TfIdf {
// here we use a basic version that is the term frequency
// divided by the log of the document frequency.
PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = wordToUriAndTfAndDf
- .apply(ParDo.named("ComputeTfIdf").of(
+ .apply("ComputeTfIdf", ParDo.of(
new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
@Override
public void processElement(ProcessContext c) {
@@ -402,7 +401,7 @@ public class TfIdf {
@Override
public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
return wordToUriAndTfIdf
- .apply(ParDo.named("Format").of(new DoFn<KV<String, KV<URI, Double>>, String>() {
+ .apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() {
@Override
public void processElement(ProcessContext c) {
c.output(String.format("%s,\t%s,\t%f",
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
index 2c0d0eb..5d95e3f 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -168,7 +168,7 @@ public class TopWikipediaSessions {
return input
.apply(ParDo.of(new ExtractUserAndTimestamp()))
- .apply(ParDo.named("SampleUsers").of(
+ .apply("SampleUsers", ParDo.of(
new DoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
@@ -179,10 +179,9 @@ public class TopWikipediaSessions {
}))
.apply(new ComputeSessions())
-
- .apply(ParDo.named("SessionsToStrings").of(new SessionsToStringsDoFn()))
+ .apply("SessionsToStrings", ParDo.of(new SessionsToStringsDoFn()))
.apply(new TopPerMonth())
- .apply(ParDo.named("FormatOutput").of(new FormatOutputDoFn()));
+ .apply("FormatOutput", ParDo.of(new FormatOutputDoFn()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
index a669b0c..017be21 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
@@ -189,8 +189,7 @@ public class FilterExamples {
// that is computed earlier in pipeline execution.
// We'll only output readings with temperatures below this mean.
PCollection<TableRow> filteredRows = monthFilteredRows
- .apply(ParDo
- .named("ParseAndFilter")
+ .apply("ParseAndFilter", ParDo
.withSideInputs(globalMeanTemp)
.of(new DoFn<TableRow, TableRow>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
index e8f1f01..3c26123 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
@@ -98,7 +98,7 @@ public class JoinExamples {
// Process the CoGbkResult elements generated by the CoGroupByKey transform.
// country code 'key' -> string of <event info>, <country name>
PCollection<KV<String, String>> finalResultCollection =
- kvpCollection.apply(ParDo.named("Process").of(
+ kvpCollection.apply("Process", ParDo.of(
new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
@Override
public void processElement(ProcessContext c) {
@@ -116,7 +116,7 @@ public class JoinExamples {
// write to GCS
PCollection<String> formattedResults = finalResultCollection
- .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() {
+ .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() {
@Override
public void processElement(ProcessContext c) {
String outputstring = "Country code: " + c.element().getKey()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index c614550..ab1fb66 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -494,7 +494,7 @@ public class TriggerExample {
Pipeline injectorPipeline = Pipeline.create(copiedOptions);
injectorPipeline
.apply(TextIO.Read.named("ReadMyFile").from(options.getInput()))
- .apply(ParDo.named("InsertRandomDelays").of(new InsertDelays()))
+ .apply("InsertRandomDelays", ParDo.of(new InsertDelays()))
.apply(IntraBundleParallelization.of(PubsubFileInjector
.withTimestampLabelKey(PUBSUB_TIMESTAMP_LABEL_KEY)
.publish(options.getPubsubTopic()))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index c8bcc8c..ad8b49e 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -123,8 +123,7 @@ public class GameStats extends LeaderBoard {
// Filter the user sums using the global mean.
PCollection<KV<String, Integer>> filtered = sumScores
- .apply(ParDo
- .named("ProcessAndFilter")
+ .apply("ProcessAndFilter", ParDo
// use the derived mean total score as a side input
.withSideInputs(globalMeanScore)
.of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
@@ -249,7 +248,7 @@ public class GameStats extends LeaderBoard {
// Read Events from Pub/Sub using custom timestamps
PCollection<GameActionInfo> rawEvents = pipeline
.apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic()))
- .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()));
+ .apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
// Extract username/score pairs from the event stream
PCollection<KV<String, Integer>> userEvents =
@@ -284,7 +283,7 @@ public class GameStats extends LeaderBoard {
Duration.standardMinutes(options.getFixedWindowDuration())))
)
// Filter out the detected spammer users, using the side input derived above.
- .apply(ParDo.named("FilterOutSpammers")
+ .apply("FilterOutSpammers", ParDo
.withSideInputs(spammersView)
.of(new DoFn<GameActionInfo, GameActionInfo>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
index c5c2fb5..7a808ac 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
@@ -109,9 +109,11 @@ public class HourlyTeamScore extends UserScore {
String getStopMin();
void setStopMin(String value);
+ @Override
@Description("The BigQuery table name. Should not already exist.")
@Default.String("hourly_team_score")
String getTableName();
+ @Override
void setTableName(String value);
}
@@ -155,7 +157,7 @@ public class HourlyTeamScore extends UserScore {
// Read 'gaming' events from a text file.
pipeline.apply(TextIO.Read.from(options.getInput()))
// Parse the incoming data.
- .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
+ .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
// Filter out data before and after the given times so that it is not included
// in the calculations. As we collect data in batches (say, by day), the batch for the day
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 12d2729..2c608aa 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -119,9 +119,11 @@ public class LeaderBoard extends HourlyTeamScore {
Integer getAllowedLateness();
void setAllowedLateness(Integer value);
+ @Override
@Description("Prefix used for the BigQuery table names")
@Default.String("leaderboard")
String getTableName();
+ @Override
void setTableName(String value);
}
@@ -183,7 +185,7 @@ public class LeaderBoard extends HourlyTeamScore {
// data elements, and parse the data.
PCollection<GameActionInfo> gameEvents = pipeline
.apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic()))
- .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()));
+ .apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
// [START DocInclude_WindowAndTrigger]
// Extract team/score pairs from the event stream, using hour-long windows by default.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index 97b6929..28614cb 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -226,7 +226,7 @@ public class UserScore {
// Read events from a text file and parse them.
pipeline.apply(TextIO.Read.from(options.getInput()))
- .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
+ .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
// Extract and sum username/score pairs from the event data.
.apply("ExtractUserScore", new ExtractAndSumScore("user"))
.apply("WriteUserScoreSums",
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
index 5897e44..5b472d7 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
@@ -115,7 +115,7 @@ public class WriteToBigQuery<T>
@Override
public PDone apply(PCollection<T> teamAndScore) {
return teamAndScore
- .apply(ParDo.named("ConvertToRow").of(new BuildRowFn()))
+ .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
.apply(BigQueryIO.Write
.to(getTable(teamAndScore.getPipeline(),
tableName))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
index 27697db..b1ccaed 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
@@ -66,7 +66,7 @@ public class WriteWindowedToBigQuery<T>
@Override
public PDone apply(PCollection<T> teamAndScore) {
return teamAndScore
- .apply(ParDo.named("ConvertToRow").of(new BuildRowFn()))
+ .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
.apply(BigQueryIO.Write
.to(getTable(teamAndScore.getPipeline(),
tableName))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
index 4254902..b917b4c 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
@@ -94,7 +94,7 @@ public class HourlyTeamScoreTest implements Serializable {
PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of()));
PCollection<KV<String, Integer>> output = input
- .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
+ .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
.apply("FilterStartTime", Filter.by(
(GameActionInfo gInfo)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index af920aa..084ac7c 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -231,26 +231,25 @@ public class TFIDF {
// Create a collection of pairs mapping a URI to each
// of the words in the document associated with that that URI.
PCollection<KV<URI, String>> uriToWords = uriToContent
- .apply(ParDo.named("SplitWords").of(
- new DoFn<KV<URI, String>, KV<URI, String>>() {
- private static final long serialVersionUID = 0;
+ .apply("SplitWords", ParDo.of(new DoFn<KV<URI, String>, KV<URI, String>>() {
+ private static final long serialVersionUID = 0;
- @Override
- public void processElement(ProcessContext c) {
- URI uri = c.element().getKey();
- String line = c.element().getValue();
- for (String word : line.split("\\W+")) {
- // Log INFO messages when the word \u201clove\u201d is found.
- if (word.toLowerCase().equals("love")) {
- LOG.info("Found {}", word.toLowerCase());
- }
-
- if (!word.isEmpty()) {
- c.output(KV.of(uri, word.toLowerCase()));
- }
- }
+ @Override
+ public void processElement(ProcessContext c) {
+ URI uri = c.element().getKey();
+ String line = c.element().getValue();
+ for (String word : line.split("\\W+")) {
+ // Log INFO messages when the word \u201clove\u201d is found.
+ if (word.toLowerCase().equals("love")) {
+ LOG.info("Found {}", word.toLowerCase());
}
- }));
+
+ if (!word.isEmpty()) {
+ c.output(KV.of(uri, word.toLowerCase()));
+ }
+ }
+ }
+ }));
// Compute a mapping from each word to the total
// number of documents in which it appears.
@@ -276,7 +275,7 @@ public class TFIDF {
// from URI to (word, count) pairs, to prepare for a join
// by the URI key.
PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
- .apply(ParDo.named("ShiftKeys").of(
+ .apply("ShiftKeys", ParDo.of(
new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
private static final long serialVersionUID = 0;
@@ -317,7 +316,7 @@ public class TFIDF {
// is simply the number of times that word occurs in the document
// divided by the total number of words in the document.
PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
- .apply(ParDo.named("ComputeTermFrequencies").of(
+ .apply("ComputeTermFrequencies", ParDo.of(
new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
private static final long serialVersionUID = 0;
@@ -343,8 +342,7 @@ public class TFIDF {
// documents is passed as a side input; the same value is
// presented to each invocation of the DoFn.
PCollection<KV<String, Double>> wordToDf = wordToDocCount
- .apply(ParDo
- .named("ComputeDocFrequencies")
+ .apply("ComputeDocFrequencies", ParDo
.withSideInputs(totalDocuments)
.of(new DoFn<KV<String, Long>, KV<String, Double>>() {
private static final long serialVersionUID = 0;
@@ -377,9 +375,9 @@ public class TFIDF {
// divided by the log of the document frequency.
return wordToUriAndTfAndDf
- .apply(ParDo.named("ComputeTfIdf").of(
+ .apply("ComputeTfIdf", ParDo.of(
new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
- private static final long serialVersionUID1 = 0;
+ private static final long serialVersionUID = 0;
@Override
public void processElement(ProcessContext c) {
@@ -419,7 +417,7 @@ public class TFIDF {
@Override
public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
return wordToUriAndTfIdf
- .apply(ParDo.named("Format").of(new DoFn<KV<String, KV<URI, Double>>, String>() {
+ .apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() {
private static final long serialVersionUID = 0;
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index 9299955..ed11781 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -92,7 +92,7 @@ public class AutoComplete {
.apply(new Count.PerElement<String>())
// Map the KV outputs of Count into our own CompletionCandiate class.
- .apply(ParDo.named("CreateCompletionCandidates").of(
+ .apply("CreateCompletionCandidates", ParDo.of(
new DoFn<KV<String, Long>, CompletionCandidate>() {
private static final long serialVersionUID = 0;
@@ -395,7 +395,7 @@ public class AutoComplete {
.apply(ComputeTopCompletions.top(10, options.getRecursive()));
toWrite
- .apply(ParDo.named("FormatForPerTaskFile").of(new FormatForPerTaskLocalFile()))
+ .apply("FormatForPerTaskFile", ParDo.of(new FormatForPerTaskLocalFile()))
.apply(TextIO.Write.to("./outputAutoComplete.txt"));
p.run();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
index b447a20..0828ddc 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
@@ -77,7 +77,7 @@ public class JoinExamples {
// Process the CoGbkResult elements generated by the CoGroupByKey transform.
// country code 'key' -> string of <event info>, <country name>
PCollection<KV<String, String>> finalResultCollection =
- kvpCollection.apply(ParDo.named("Process").of(
+ kvpCollection.apply("Process", ParDo.of(
new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
private static final long serialVersionUID = 0;
@@ -100,7 +100,7 @@ public class JoinExamples {
}));
return finalResultCollection
- .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() {
+ .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() {
private static final long serialVersionUID = 0;
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 5818ba5..d47d285 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -179,6 +179,7 @@ import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+
import javax.annotation.Nullable;
/**
@@ -2540,7 +2541,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
.apply(new Deduplicate<T>());
} else {
return Pipeline.applyTransform(input, new ReadWithIds<T>(source))
- .apply(ValueWithRecordId.<T>stripIds());
+ .apply("StripIds", ParDo.of(new ValueWithRecordId.StripIdsDoFn<T>()));
}
}
@@ -2613,7 +2614,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// Reshuffle will dedup based on ids in ValueWithRecordId by passing the data through
// WindmillSink.
.apply(Reshuffle.<Integer, ValueWithRecordId<T>>of())
- .apply(ParDo.named("StripIds").of(
+ .apply("StripIds", ParDo.of(
new DoFn<KV<Integer, ValueWithRecordId<T>>, T>() {
@Override
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
index 1b18c2a..5f808a5 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
@@ -63,7 +63,7 @@ public class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>>
} else {
// If the windowFn didn't change, we just run a pass-through transform and then set the
// new windowing strategy.
- return input.apply(ParDo.named("Identity").of(new DoFn<T, T>() {
+ return input.apply("Identity", ParDo.of(new DoFn<T, T>() {
@Override
public void processElement(DoFn<T, T>.ProcessContext c) throws Exception {
c.output(c.element());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 261ba99..c3a6a11 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -513,9 +513,9 @@ public class DataflowPipelineTranslatorTest implements Serializable {
DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
Pipeline pipeline = Pipeline.create(options);
String stepName = "DoFn1";
- pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in"))
- .apply(ParDo.of(new NoOpFn()).named(stepName))
- .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/out"));
+ pipeline.apply("ReadMyFile", TextIO.Read.from("gs://bucket/in"))
+ .apply(stepName, ParDo.of(new NoOpFn()))
+ .apply("WriteMyFile", TextIO.Write.to("gs://bucket/out"));
Job job =
translator
.translate(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index ba13f9d..cfdd581 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -96,7 +97,7 @@ class BoundedReadFromUnboundedSource<T> extends PTransform<PInput, PCollection<T
}
}));
}
- return read.apply(ValueWithRecordId.<T>stripIds());
+ return read.apply("StripIds", ParDo.of(new ValueWithRecordId.StripIdsDoFn<T>()));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index a165c91..78758a2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -420,22 +420,20 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
@Override
public PDone apply(PCollection<T> input) {
- input.apply(
- Window.named("PubsubUnboundedSink.Window")
- .<T>into(new GlobalWindows())
- .triggering(
- Repeatedly.forever(
- AfterFirst.of(AfterPane.elementCountAtLeast(publishBatchSize),
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(maxLatency))))
+ input.apply("PubsubUnboundedSink.Window", Window.<T>into(new GlobalWindows())
+ .triggering(
+ Repeatedly.forever(
+ AfterFirst.of(AfterPane.elementCountAtLeast(publishBatchSize),
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(maxLatency))))
.discardingFiredPanes())
- .apply(ParDo.named("PubsubUnboundedSink.Shard")
- .of(new ShardFn<T>(elementCoder, numShards, recordIdMethod)))
+ .apply("PubsubUnboundedSink.Shard",
+ ParDo.of(new ShardFn<T>(elementCoder, numShards, recordIdMethod)))
.setCoder(KvCoder.of(VarIntCoder.of(), CODER))
.apply(GroupByKey.<Integer, OutgoingMessage>create())
- .apply(ParDo.named("PubsubUnboundedSink.Writer")
- .of(new WriterFn(pubsubFactory, topic, timestampLabel, idLabel,
- publishBatchSize, publishBatchBytes)));
+ .apply("PubsubUnboundedSink.Writer",
+ ParDo.of(new WriterFn(pubsubFactory, topic, timestampLabel, idLabel,
+ publishBatchSize, publishBatchBytes)));
return PDone.in(input.getPipeline());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index e7634ec..07d355e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -1296,8 +1296,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
return input.getPipeline().begin()
.apply(Read.from(new PubsubSource<T>(this)))
- .apply(ParDo.named("PubsubUnboundedSource.Stats")
- .of(new StatsFn<T>(pubsubFactory, subscription,
- timestampLabel, idLabel)));
+ .apply("PubsubUnboundedSource.Stats",
+ ParDo.of(new StatsFn<T>(pubsubFactory, subscription, timestampLabel, idLabel)));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index a29a56d..1a3d85d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -718,10 +718,8 @@ public class PAssert {
input
.apply(Create.of(0).withCoder(VarIntCoder.of()))
- .apply(
- ParDo.named("RunChecks")
- .withSideInputs(actual)
- .of(new SideInputCheckerDoFn<>(checkerFn, actual)));
+ .apply("RunChecks",
+ ParDo.withSideInputs(actual).of(new SideInputCheckerDoFn<>(checkerFn, actual)));
return PDone.in(input.getPipeline());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 20c1242..7871672 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StandardCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.AbstractPerKeyCombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
@@ -1400,7 +1401,7 @@ public class Combine {
final OutputT defaultValue = fn.defaultValue();
PCollection<OutputT> defaultIfEmpty = maybeEmpty.getPipeline()
.apply("CreateVoid", Create.of((Void) null).withCoder(VoidCoder.of()))
- .apply(ParDo.named("ProduceDefault").withSideInputs(maybeEmptyView).of(
+ .apply("ProduceDefault", ParDo.withSideInputs(maybeEmptyView).of(
new DoFn<Void, OutputT>() {
@Override
public void processElement(DoFn<Void, OutputT>.ProcessContext c) {
@@ -2024,28 +2025,27 @@ public class Combine {
// augmenting the hot keys with a nonce.
final TupleTag<KV<KV<K, Integer>, InputT>> hot = new TupleTag<>();
final TupleTag<KV<K, InputT>> cold = new TupleTag<>();
- PCollectionTuple split = input.apply(
- ParDo.named("AddNonce").of(
- new DoFn<KV<K, InputT>, KV<K, InputT>>() {
- transient int counter;
- @Override
- public void startBundle(Context c) {
- counter = ThreadLocalRandom.current().nextInt(
- Integer.MAX_VALUE);
- }
+ PCollectionTuple split = input.apply("AddNonce", ParDo.of(
+ new DoFn<KV<K, InputT>, KV<K, InputT>>() {
+ transient int counter;
+ @Override
+ public void startBundle(Context c) {
+ counter = ThreadLocalRandom.current().nextInt(
+ Integer.MAX_VALUE);
+ }
- @Override
- public void processElement(ProcessContext c) {
- KV<K, InputT> kv = c.element();
- int spread = Math.max(1, hotKeyFanout.apply(kv.getKey()));
- if (spread <= 1) {
- c.output(kv);
- } else {
- int nonce = counter++ % spread;
- c.sideOutput(hot, KV.of(KV.of(kv.getKey(), nonce), kv.getValue()));
- }
- }
- })
+ @Override
+ public void processElement(ProcessContext c) {
+ KV<K, InputT> kv = c.element();
+ int spread = Math.max(1, hotKeyFanout.apply(kv.getKey()));
+ if (spread <= 1) {
+ c.output(kv);
+ } else {
+ int nonce = counter++ % spread;
+ c.sideOutput(hot, KV.of(KV.of(kv.getKey(), nonce), kv.getValue()));
+ }
+ }
+ })
.withOutputTags(cold, TupleTagList.of(hot)));
// The first level of combine should never use accumulating mode.
@@ -2063,7 +2063,7 @@ public class Combine {
inputCoder.getValueCoder()))
.setWindowingStrategyInternal(preCombineStrategy)
.apply("PreCombineHot", Combine.perKey(hotPreCombine))
- .apply(ParDo.named("StripNonce").of(
+ .apply("StripNonce", ParDo.of(
new DoFn<KV<KV<K, Integer>, AccumT>,
KV<K, InputOrAccum<InputT, AccumT>>>() {
@Override
@@ -2079,7 +2079,7 @@ public class Combine {
PCollection<KV<K, InputOrAccum<InputT, AccumT>>> preprocessedCold = split
.get(cold)
.setCoder(inputCoder)
- .apply(ParDo.named("PrepareCold").of(
+ .apply("PrepareCold", ParDo.of(
new DoFn<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>() {
@Override
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index 3bf264e..3a0fb5d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -107,7 +107,7 @@ public class Count {
public PCollection<KV<T, Long>> apply(PCollection<T> input) {
return
input
- .apply(ParDo.named("Init").of(new DoFn<T, KV<T, Void>>() {
+ .apply("Init", ParDo.of(new DoFn<T, KV<T, Void>>() {
@Override
public void processElement(ProcessContext c) {
c.output(KV.of(c.element(), (Void) null));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index 932643f..4f270a7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -133,7 +133,7 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
@Override
public PCollection<OutputT> apply(PCollection<InputT> input) {
- return input.apply(ParDo.named("Map").of(new DoFn<InputT, OutputT>() {
+ return input.apply("Map", ParDo.of(new DoFn<InputT, OutputT>() {
private static final long serialVersionUID = 0L;
@Override
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index 93917f3..0b83fb6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -173,7 +173,7 @@ public class Flatten {
@SuppressWarnings("unchecked")
Coder<T> elemCoder = ((IterableLikeCoder<T, ?>) inCoder).getElemCoder();
- return in.apply(ParDo.named("FlattenIterables").of(
+ return in.apply("FlattenIterables", ParDo.of(
new DoFn<Iterable<T>, T>() {
@Override
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
index a1e5e0a..636e306 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
@@ -58,12 +58,11 @@ public class Keys<K> extends PTransform<PCollection<? extends KV<K, ?>>,
@Override
public PCollection<K> apply(PCollection<? extends KV<K, ?>> in) {
return
- in.apply(ParDo.named("Keys")
- .of(new DoFn<KV<K, ?>, K>() {
- @Override
- public void processElement(ProcessContext c) {
- c.output(c.element().getKey());
- }
- }));
+ in.apply("Keys", ParDo.of(new DoFn<KV<K, ?>, K>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.element().getKey());
+ }
+ }));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
index 2dfc7c1..9597c92 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
@@ -62,13 +62,12 @@ public class KvSwap<K, V> extends PTransform<PCollection<KV<K, V>>,
@Override
public PCollection<KV<V, K>> apply(PCollection<KV<K, V>> in) {
return
- in.apply(ParDo.named("KvSwap")
- .of(new DoFn<KV<K, V>, KV<V, K>>() {
- @Override
- public void processElement(ProcessContext c) {
- KV<K, V> e = c.element();
- c.output(KV.of(e.getValue(), e.getKey()));
- }
- }));
+ in.apply("KvSwap", ParDo.of(new DoFn<KV<K, V>, KV<V, K>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ KV<K, V> e = c.element();
+ c.output(KV.of(e.getValue(), e.getKey()));
+ }
+ }));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index d64bad1..f535111 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -104,7 +104,7 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
@Override
public PCollection<OutputT> apply(PCollection<InputT> input) {
- return input.apply(ParDo.named("Map").of(new DoFn<InputT, OutputT>() {
+ return input.apply("Map", ParDo.of(new DoFn<InputT, OutputT>() {
@Override
public void processElement(ProcessContext c) {
c.output(fn.apply(c.element()));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
index 4bcfb29..a56eefc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
@@ -91,7 +91,7 @@ import java.io.Serializable;
*
* <pre> {@code
* ...
- * .apply(ParDo.named("Step1").of(new MyDoFn3()))
+ * .apply("Step1", ParDo.of(new MyDoFn3()))
* ...
* } </pre>
*
@@ -218,9 +218,8 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
// understand why all of its instance state is transient.
/**
- * The base name of this {@code PTransform}, e.g., from
- * {@link ParDo#named(String)}, or from defaults, or {@code null} if not
- * yet assigned.
+ * The base name of this {@code PTransform}, e.g., from defaults, or
+ * {@code null} if not yet assigned.
*/
protected final transient String name;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
index 84b1f80..b82744d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
@@ -85,13 +85,12 @@ public class RemoveDuplicates<T> extends PTransform<PCollection<T>,
@Override
public PCollection<T> apply(PCollection<T> in) {
return in
- .apply(ParDo.named("CreateIndex")
- .of(new DoFn<T, KV<T, Void>>() {
- @Override
- public void processElement(ProcessContext c) {
- c.output(KV.of(c.element(), (Void) null));
- }
- }))
+ .apply("CreateIndex", ParDo.of(new DoFn<T, KV<T, Void>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(KV.of(c.element(), (Void) null));
+ }
+ }))
.apply(Combine.<T, Void>perKey(
new SerializableFunction<Iterable<Void>, Void>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
index 0b533b8..b3d38b9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
@@ -58,12 +58,11 @@ public class Values<V> extends PTransform<PCollection<? extends KV<?, V>>,
@Override
public PCollection<V> apply(PCollection<? extends KV<?, V>> in) {
return
- in.apply(ParDo.named("Values")
- .of(new DoFn<KV<?, V>, V>() {
- @Override
- public void processElement(ProcessContext c) {
- c.output(c.element().getValue());
- }
- }));
+ in.apply("Values", ParDo.of(new DoFn<KV<?, V>, V>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.element().getValue());
+ }
+ }));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
index 198e7cb..25116d8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
@@ -113,14 +113,13 @@ public class WithKeys<K, V> extends PTransform<PCollection<V>,
@Override
public PCollection<KV<K, V>> apply(PCollection<V> in) {
PCollection<KV<K, V>> result =
- in.apply(ParDo.named("AddKeys")
- .of(new DoFn<V, KV<K, V>>() {
- @Override
- public void processElement(ProcessContext c) {
- c.output(KV.of(fn.apply(c.element()),
- c.element()));
- }
- }));
+ in.apply("AddKeys", ParDo.of(new DoFn<V, KV<K, V>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(KV.of(fn.apply(c.element()),
+ c.element()));
+ }
+ }));
try {
Coder<K> keyCoder;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
index eae8de5..ef4b269 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
@@ -101,7 +101,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
@Override
public PCollection<T> apply(PCollection<T> input) {
return input
- .apply(ParDo.named("AddTimestamps").of(new AddTimestampsDoFn<T>(fn, allowedTimestampSkew)))
+ .apply("AddTimestamps", ParDo.of(new AddTimestampsDoFn<T>(fn, allowedTimestampSkew)))
.setTypeDescriptorInternal(input.getTypeDescriptor());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
index d7ac5e4..ba4a4a7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
@@ -128,9 +128,8 @@ public class CoGroupByKey<K> extends
flattenedTable.apply(GroupByKey.<K, RawUnionValue>create());
CoGbkResultSchema tupleTags = input.getCoGbkResultSchema();
- PCollection<KV<K, CoGbkResult>> result = groupedTable.apply(
- ParDo.of(new ConstructCoGbkResultFn<K>(tupleTags))
- .named("ConstructCoGbkResultFn"));
+ PCollection<KV<K, CoGbkResult>> result = groupedTable.apply("ConstructCoGbkResultFn",
+ ParDo.of(new ConstructCoGbkResultFn<K>(tupleTags)));
result.setCoder(KvCoder.of(keyCoder,
CoGbkResultCoder.of(tupleTags, unionCoder)));
@@ -163,9 +162,8 @@ public class CoGroupByKey<K> extends
PCollection<KV<K, V>> pCollection,
KvCoder<K, RawUnionValue> unionTableEncoder) {
- return pCollection.apply(ParDo.of(
- new ConstructUnionTableFn<K, V>(index)).named("MakeUnionTable" + index))
- .setCoder(unionTableEncoder);
+ return pCollection.apply("MakeUnionTable" + index,
+ ParDo.of(new ConstructUnionTableFn<K, V>(index))).setCoder(unionTableEncoder);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 324b4d5..7d790d4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -645,16 +645,6 @@ public class Window {
}
}
- /////////////////////////////////////////////////////////////////////////////
-
- private static <T> PTransform<PCollection<? extends T>, PCollection<T>> identity() {
- return ParDo.named("Identity").of(new DoFn<T, T>() {
- @Override public void processElement(ProcessContext c) {
- c.output(c.element());
- }
- });
- }
-
/**
* Creates a {@code Window} {@code PTransform} that does not change assigned
* windows, but will cause windows to be merged again as part of the next
@@ -675,7 +665,16 @@ public class Window {
WindowingStrategy<?, ?> outputWindowingStrategy = getOutputWindowing(
input.getWindowingStrategy());
- return input.apply(Window.<T>identity())
+ return input
+ // We first apply a (trivial) transform to the input PCollection to produce a new
+ // PCollection. This ensures that we don't modify the windowing strategy of the input
+ // which may be used elsewhere.
+ .apply("Identity", ParDo.of(new DoFn<T, T>() {
+ @Override public void processElement(ProcessContext c) {
+ c.output(c.element());
+ }
+ }))
+ // Then we modify the windowing strategy.
.setWindowingStrategyInternal(outputWindowingStrategy);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index c0d159b..6c58689 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+
import org.joda.time.Duration;
/**
@@ -68,7 +69,7 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti
// Set the windowing strategy directly, so that it doesn't get counted as the user having
// set allowed lateness.
.setWindowingStrategyInternal(originalStrategy)
- .apply(ParDo.named("ExpandIterable").of(
+ .apply("ExpandIterable", ParDo.of(
new DoFn<KV<K, Iterable<V>>, KV<K, V>>() {
@Override
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java
index 8e4e134..80dfcae 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java
@@ -21,9 +21,6 @@ import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StandardCoder;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
@@ -142,15 +139,11 @@ public class ValueWithRecordId<ValueT> {
ByteArrayCoder idCoder;
}
- public static <T>
- PTransform<PCollection<? extends ValueWithRecordId<T>>, PCollection<T>> stripIds() {
- return ParDo.named("StripIds")
- .of(
- new DoFn<ValueWithRecordId<T>, T>() {
- @Override
- public void processElement(ProcessContext c) {
- c.output(c.element().getValue());
- }
- });
+ /** {@link DoFn} to turn a {@code ValueWithRecordId<T>} back to the value {@code T}. */
+ public static class StripIdsDoFn<T> extends DoFn<ValueWithRecordId<T>, T> {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.element().getValue());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 03ecf6f..db32fa6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -24,6 +24,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.include
import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
+
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
@@ -815,37 +816,22 @@ public class ParDoTest implements Serializable {
.setName("MyInput");
{
- PCollection<String> output1 =
- input
- .apply(ParDo.of(new TestDoFn()));
+ PCollection<String> output1 = input.apply(ParDo.of(new TestDoFn()));
assertEquals("ParDo(Test).out", output1.getName());
}
{
- PCollection<String> output2 =
- input
- .apply(ParDo.named("MyParDo").of(new TestDoFn()));
+ PCollection<String> output2 = input.apply("MyParDo", ParDo.of(new TestDoFn()));
assertEquals("MyParDo.out", output2.getName());
}
{
- PCollection<String> output3 =
- input
- .apply(ParDo.of(new TestDoFn()).named("HerParDo"));
- assertEquals("HerParDo.out", output3.getName());
- }
-
- {
- PCollection<String> output4 =
- input
- .apply(ParDo.of(new TestDoFn()).named("TestDoFn"));
+ PCollection<String> output4 = input.apply("TestDoFn", ParDo.of(new TestDoFn()));
assertEquals("TestDoFn.out", output4.getName());
}
{
- PCollection<String> output5 =
- input
- .apply(ParDo.of(new StrangelyNamedDoer()));
+ PCollection<String> output5 = input.apply(ParDo.of(new StrangelyNamedDoer()));
assertEquals("ParDo(StrangelyNamedDoer).out",
output5.getName());
}
@@ -869,8 +855,7 @@ public class ParDoTest implements Serializable {
PCollectionTuple outputs = p
.apply(Create.of(Arrays.asList(3, -42, 666))).setName("MyInput")
- .apply(ParDo
- .named("MyParDo")
+ .apply("MyParDo", ParDo
.of(new TestDoFn(
Arrays.<PCollectionView<Integer>>asList(),
Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3)))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index 5377f23..21f58df 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -52,7 +52,6 @@ import java.io.Serializable;
/** Unit tests for bucketing. */
@RunWith(JUnit4.class)
-@SuppressWarnings("unchecked")
public class WindowingTest implements Serializable {
@Rule
public transient TemporaryFolder tmpFolder = new TemporaryFolder();
@@ -73,12 +72,11 @@ public class WindowingTest implements Serializable {
}
@Override
public PCollection<String> apply(PCollection<String> in) {
- return in.apply(
- Window.named("Window")
- .<String>into(windowFn)
+ return in.apply("Window",
+ Window.<String>into(windowFn)
.withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))
.apply(Count.<String>perElement())
- .apply(ParDo.named("FormatCounts").of(new FormatCountsDoFn()))
+ .apply("FormatCounts", ParDo.of(new FormatCountsDoFn()))
.setCoder(StringUtf8Coder.of());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
index 98af2e7..be32afa 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
@@ -82,7 +82,7 @@ public class MinimalWordCount {
// DoFn (defined in-line) on each element that tokenizes the text line into individual words.
// The ParDo returns a PCollection<String>, where each element is an individual word in
// Shakespeare's collected texts.
- .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
+ .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
for (String word : c.element().split("[^a-zA-Z']+")) {
@@ -98,7 +98,7 @@ public class MinimalWordCount {
.apply(Count.<String>perElement())
// Apply another ParDo transform that formats our PCollection of word counts into a printable
// string, suitable for writing to an output file.
- .apply(ParDo.named("FormatResults").of(new DoFn<KV<String, Long>, String>() {
+ .apply("FormatResults", ParDo.of(new DoFn<KV<String, Long>, String>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.element().getKey() + ": " + c.element().getValue());