You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/06/24 06:21:18 UTC

[1/3] incubator-beam git commit: Remove ParDo.named on all variants

Repository: incubator-beam
Updated Branches:
  refs/heads/master 7745b921f -> 41faee4f9


Remove ParDo.named on all variants


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4f050bf5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4f050bf5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4f050bf5

Branch: refs/heads/master
Commit: 4f050bf56832b1cdeda7af1a58589a1ed952a41a
Parents: d7f2810
Author: Ben Chambers <bc...@google.com>
Authored: Thu Jun 23 09:57:55 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Thu Jun 23 22:55:43 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/ParDo.java   | 70 +-------------------
 1 file changed, 3 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f050bf5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index cb7d372..16dfcac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -123,16 +123,13 @@ import java.util.List;
  * a unique name - which may not be stable across pipeline revision -
  * will be generated, based on the transform name.
  *
- * <p>If a {@link ParDo} is applied exactly once inlined, then
- * it can be given a name via {@link #named}. For example:
+ * <p>For example:
  *
  * <pre> {@code
  * PCollection<String> words =
- *     lines.apply(ParDo.named("ExtractWords")
- *                      .of(new DoFn<String, String>() { ... }));
+ *     lines.apply("ExtractWords", ParDo.of(new DoFn<String, String>() { ... }));
  * PCollection<Integer> wordLengths =
- *     words.apply(ParDo.named("ComputeWordLengths")
- *                      .of(new DoFn<String, Integer>() { ... }));
+ *     words.apply("ComputeWordLengths", ParDo.of(new DoFn<String, Integer>() { ... }));
  * } </pre>
  *
  * <h2>Side Inputs</h2>
@@ -437,21 +434,6 @@ import java.util.List;
 public class ParDo {
 
   /**
-   * Creates a {@link ParDo} {@link PTransform} with the given name.
-   *
-   * <p>See the discussion of naming above for more explanation.
-   *
-   * <p>The resulting {@link PTransform} is incomplete, and its
-   * input/output types are not yet bound. Use
-   * {@link ParDo.Unbound#of} to specify the {@link DoFn} to
-   * invoke, which will also bind the input/output types of this
-   * {@link PTransform}.
-   */
-  public static Unbound named(String name) {
-    return new Unbound().named(name);
-  }
-
-  /**
    * Creates a {@link ParDo} {@link PTransform} with the given
    * side inputs.
    *
@@ -588,17 +570,6 @@ public class ParDo {
 
     /**
      * Returns a new {@link ParDo} transform that's like this
-     * transform but with the specified name. Does not modify this
-     * transform. The resulting transform is still incomplete.
-     *
-     * <p>See the discussion of naming above for more explanation.
-     */
-    public Unbound named(String name) {
-      return new Unbound(name, sideInputs);
-    }
-
-    /**
-     * Returns a new {@link ParDo} transform that's like this
      * transform but with the specified additional side inputs.
      * Does not modify this transform. The resulting transform is
      * still incomplete.
@@ -703,17 +674,6 @@ public class ParDo {
 
     /**
      * Returns a new {@link ParDo} {@link PTransform} that's like this
-     * {@link PTransform} but with the specified name. Does not
-     * modify this {@link PTransform}.
-     *
-     * <p>See the discussion of Naming above for more explanation.
-     */
-    public Bound<InputT, OutputT> named(String name) {
-      return new Bound<>(name, sideInputs, fn, fnClass);
-    }
-
-    /**
-     * Returns a new {@link ParDo} {@link PTransform} that's like this
      * {@link PTransform} but with the specified additional side inputs. Does not
      * modify this {@link PTransform}.
      *
@@ -833,18 +793,6 @@ public class ParDo {
 
     /**
      * Returns a new multi-output {@link ParDo} transform that's like
-     * this transform but with the specified name. Does not modify
-     * this transform. The resulting transform is still incomplete.
-     *
-     * <p>See the discussion of Naming above for more explanation.
-     */
-    public UnboundMulti<OutputT> named(String name) {
-      return new UnboundMulti<>(
-          name, sideInputs, mainOutputTag, sideOutputTags);
-    }
-
-    /**
-     * Returns a new multi-output {@link ParDo} transform that's like
      * this transform but with the specified side inputs. Does not
      * modify this transform. The resulting transform is still
      * incomplete.
@@ -942,18 +890,6 @@ public class ParDo {
 
     /**
      * Returns a new multi-output {@link ParDo} {@link PTransform}
-     * that's like this {@link PTransform} but with the specified
-     * name. Does not modify this {@link PTransform}.
-     *
-     * <p>See the discussion of Naming above for more explanation.
-     */
-    public BoundMulti<InputT, OutputT> named(String name) {
-      return new BoundMulti<>(
-          name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass);
-    }
-
-    /**
-     * Returns a new multi-output {@link ParDo} {@link PTransform}
      * that's like this {@link PTransform} but with the specified additional side
      * inputs. Does not modify this {@link PTransform}.
      *


[3/3] incubator-beam git commit: This closes #522

Posted by bc...@apache.org.
This closes #522


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41faee4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41faee4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41faee4f

Branch: refs/heads/master
Commit: 41faee4f9cbe29ad6a80742789a0004abe59e684
Parents: 7745b92 4f050bf
Author: bchambers <bc...@google.com>
Authored: Thu Jun 23 22:55:46 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Thu Jun 23 22:55:46 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/examples/MinimalWordCount.java  |  2 +-
 .../beam/examples/complete/AutoComplete.java    |  4 +-
 .../apache/beam/examples/complete/TfIdf.java    | 13 ++--
 .../examples/complete/TopWikipediaSessions.java |  7 +-
 .../beam/examples/cookbook/FilterExamples.java  |  3 +-
 .../beam/examples/cookbook/JoinExamples.java    |  4 +-
 .../beam/examples/cookbook/TriggerExample.java  |  2 +-
 .../beam/examples/complete/game/GameStats.java  |  7 +-
 .../examples/complete/game/HourlyTeamScore.java |  4 +-
 .../examples/complete/game/LeaderBoard.java     |  4 +-
 .../beam/examples/complete/game/UserScore.java  |  2 +-
 .../complete/game/utils/WriteToBigQuery.java    |  2 +-
 .../game/utils/WriteWindowedToBigQuery.java     |  2 +-
 .../complete/game/HourlyTeamScoreTest.java      |  2 +-
 .../beam/runners/flink/examples/TFIDF.java      | 48 +++++++-------
 .../flink/examples/streaming/AutoComplete.java  |  4 +-
 .../flink/examples/streaming/JoinExamples.java  |  4 +-
 .../beam/runners/dataflow/DataflowRunner.java   |  5 +-
 .../dataflow/internal/AssignWindows.java        |  2 +-
 .../DataflowPipelineTranslatorTest.java         |  6 +-
 .../sdk/io/BoundedReadFromUnboundedSource.java  |  3 +-
 .../apache/beam/sdk/io/PubsubUnboundedSink.java | 24 +++----
 .../beam/sdk/io/PubsubUnboundedSource.java      |  5 +-
 .../org/apache/beam/sdk/testing/PAssert.java    |  6 +-
 .../org/apache/beam/sdk/transforms/Combine.java | 48 +++++++-------
 .../org/apache/beam/sdk/transforms/Count.java   |  2 +-
 .../beam/sdk/transforms/FlatMapElements.java    |  2 +-
 .../org/apache/beam/sdk/transforms/Flatten.java |  2 +-
 .../org/apache/beam/sdk/transforms/Keys.java    | 13 ++--
 .../org/apache/beam/sdk/transforms/KvSwap.java  | 15 ++---
 .../apache/beam/sdk/transforms/MapElements.java |  2 +-
 .../apache/beam/sdk/transforms/PTransform.java  |  7 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   | 70 +-------------------
 .../beam/sdk/transforms/RemoveDuplicates.java   | 13 ++--
 .../org/apache/beam/sdk/transforms/Values.java  | 13 ++--
 .../apache/beam/sdk/transforms/WithKeys.java    | 15 ++---
 .../beam/sdk/transforms/WithTimestamps.java     |  2 +-
 .../beam/sdk/transforms/join/CoGroupByKey.java  | 10 ++-
 .../beam/sdk/transforms/windowing/Window.java   | 21 +++---
 .../org/apache/beam/sdk/util/Reshuffle.java     |  3 +-
 .../apache/beam/sdk/util/ValueWithRecordId.java | 19 ++----
 .../apache/beam/sdk/transforms/ParDoTest.java   | 27 ++------
 .../sdk/transforms/windowing/WindowingTest.java |  8 +--
 .../src/main/java/MinimalWordCount.java         |  4 +-
 44 files changed, 180 insertions(+), 281 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-beam git commit: Remove uses of ParDo.named

Posted by bc...@apache.org.
Remove uses of ParDo.named

Also removed a few other uses of .named() methods that were nearby
use-sites being cleaned up.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d7f2810a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d7f2810a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d7f2810a

Branch: refs/heads/master
Commit: d7f2810ad24e805793d6d43a639a48845aac4e48
Parents: 7745b92
Author: Ben Chambers <bc...@google.com>
Authored: Thu Jun 23 09:53:40 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Thu Jun 23 22:55:43 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/examples/MinimalWordCount.java  |  2 +-
 .../beam/examples/complete/AutoComplete.java    |  4 +-
 .../apache/beam/examples/complete/TfIdf.java    | 13 +++---
 .../examples/complete/TopWikipediaSessions.java |  7 ++-
 .../beam/examples/cookbook/FilterExamples.java  |  3 +-
 .../beam/examples/cookbook/JoinExamples.java    |  4 +-
 .../beam/examples/cookbook/TriggerExample.java  |  2 +-
 .../beam/examples/complete/game/GameStats.java  |  7 ++-
 .../examples/complete/game/HourlyTeamScore.java |  4 +-
 .../examples/complete/game/LeaderBoard.java     |  4 +-
 .../beam/examples/complete/game/UserScore.java  |  2 +-
 .../complete/game/utils/WriteToBigQuery.java    |  2 +-
 .../game/utils/WriteWindowedToBigQuery.java     |  2 +-
 .../complete/game/HourlyTeamScoreTest.java      |  2 +-
 .../beam/runners/flink/examples/TFIDF.java      | 48 ++++++++++----------
 .../flink/examples/streaming/AutoComplete.java  |  4 +-
 .../flink/examples/streaming/JoinExamples.java  |  4 +-
 .../beam/runners/dataflow/DataflowRunner.java   |  5 +-
 .../dataflow/internal/AssignWindows.java        |  2 +-
 .../DataflowPipelineTranslatorTest.java         |  6 +--
 .../sdk/io/BoundedReadFromUnboundedSource.java  |  3 +-
 .../apache/beam/sdk/io/PubsubUnboundedSink.java | 24 +++++-----
 .../beam/sdk/io/PubsubUnboundedSource.java      |  5 +-
 .../org/apache/beam/sdk/testing/PAssert.java    |  6 +--
 .../org/apache/beam/sdk/transforms/Combine.java | 48 ++++++++++----------
 .../org/apache/beam/sdk/transforms/Count.java   |  2 +-
 .../beam/sdk/transforms/FlatMapElements.java    |  2 +-
 .../org/apache/beam/sdk/transforms/Flatten.java |  2 +-
 .../org/apache/beam/sdk/transforms/Keys.java    | 13 +++---
 .../org/apache/beam/sdk/transforms/KvSwap.java  | 15 +++---
 .../apache/beam/sdk/transforms/MapElements.java |  2 +-
 .../apache/beam/sdk/transforms/PTransform.java  |  7 ++-
 .../beam/sdk/transforms/RemoveDuplicates.java   | 13 +++---
 .../org/apache/beam/sdk/transforms/Values.java  | 13 +++---
 .../apache/beam/sdk/transforms/WithKeys.java    | 15 +++---
 .../beam/sdk/transforms/WithTimestamps.java     |  2 +-
 .../beam/sdk/transforms/join/CoGroupByKey.java  | 10 ++--
 .../beam/sdk/transforms/windowing/Window.java   | 21 ++++-----
 .../org/apache/beam/sdk/util/Reshuffle.java     |  3 +-
 .../apache/beam/sdk/util/ValueWithRecordId.java | 19 +++-----
 .../apache/beam/sdk/transforms/ParDoTest.java   | 27 +++--------
 .../sdk/transforms/windowing/WindowingTest.java |  8 ++--
 .../src/main/java/MinimalWordCount.java         |  4 +-
 43 files changed, 177 insertions(+), 214 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
index 355a1ff..2c67609 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
@@ -92,7 +92,7 @@ public class MinimalWordCount {
      // DoFn (defined in-line) on each element that tokenizes the text line into individual words.
      // The ParDo returns a PCollection<String>, where each element is an individual word in
      // Shakespeare's collected texts.
-     .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
+     .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
                        @Override
                        public void processElement(ProcessContext c) {
                          for (String word : c.element().split("[^a-zA-Z']+")) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index c6893f4..2732aa5 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -130,7 +130,7 @@ public class AutoComplete {
         .apply(new Count.PerElement<String>())
 
         // Map the KV outputs of Count into our own CompletionCandiate class.
-        .apply(ParDo.named("CreateCompletionCandidates").of(
+        .apply("CreateCompletionCandidates", ParDo.of(
             new DoFn<KV<String, Long>, CompletionCandidate>() {
               @Override
               public void processElement(ProcessContext c) {
@@ -481,7 +481,7 @@ public class AutoComplete {
 
     if (options.getOutputToDatastore()) {
       toWrite
-      .apply(ParDo.named("FormatForDatastore").of(new FormatForDatastore(options.getKind())))
+      .apply("FormatForDatastore", ParDo.of(new FormatForDatastore(options.getKind())))
       .apply(DatastoreIO.writeTo(MoreObjects.firstNonNull(
           options.getOutputProject(), options.getProject())));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 73f3323..23653d6 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -225,7 +225,7 @@ public class TfIdf {
       // Create a collection of pairs mapping a URI to each
       // of the words in the document associated with that that URI.
       PCollection<KV<URI, String>> uriToWords = uriToContent
-          .apply(ParDo.named("SplitWords").of(
+          .apply("SplitWords", ParDo.of(
               new DoFn<KV<URI, String>, KV<URI, String>>() {
                 @Override
                 public void processElement(ProcessContext c) {
@@ -268,7 +268,7 @@ public class TfIdf {
       // from URI to (word, count) pairs, to prepare for a join
       // by the URI key.
       PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
-          .apply(ParDo.named("ShiftKeys").of(
+          .apply("ShiftKeys", ParDo.of(
               new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
                 @Override
                 public void processElement(ProcessContext c) {
@@ -307,7 +307,7 @@ public class TfIdf {
       // is simply the number of times that word occurs in the document
       // divided by the total number of words in the document.
       PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
-          .apply(ParDo.named("ComputeTermFrequencies").of(
+          .apply("ComputeTermFrequencies", ParDo.of(
               new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
                 @Override
                 public void processElement(ProcessContext c) {
@@ -331,8 +331,7 @@ public class TfIdf {
       // documents is passed as a side input; the same value is
       // presented to each invocation of the DoFn.
       PCollection<KV<String, Double>> wordToDf = wordToDocCount
-          .apply(ParDo
-              .named("ComputeDocFrequencies")
+          .apply("ComputeDocFrequencies", ParDo
               .withSideInputs(totalDocuments)
               .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
                 @Override
@@ -362,7 +361,7 @@ public class TfIdf {
       // here we use a basic version that is the term frequency
       // divided by the log of the document frequency.
       PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = wordToUriAndTfAndDf
-          .apply(ParDo.named("ComputeTfIdf").of(
+          .apply("ComputeTfIdf", ParDo.of(
               new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
                 @Override
                 public void processElement(ProcessContext c) {
@@ -402,7 +401,7 @@ public class TfIdf {
     @Override
     public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
       return wordToUriAndTfIdf
-          .apply(ParDo.named("Format").of(new DoFn<KV<String, KV<URI, Double>>, String>() {
+          .apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() {
             @Override
             public void processElement(ProcessContext c) {
               c.output(String.format("%s,\t%s,\t%f",

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
index 2c0d0eb..5d95e3f 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -168,7 +168,7 @@ public class TopWikipediaSessions {
       return input
           .apply(ParDo.of(new ExtractUserAndTimestamp()))
 
-          .apply(ParDo.named("SampleUsers").of(
+          .apply("SampleUsers", ParDo.of(
               new DoFn<String, String>() {
                 @Override
                 public void processElement(ProcessContext c) {
@@ -179,10 +179,9 @@ public class TopWikipediaSessions {
               }))
 
           .apply(new ComputeSessions())
-
-          .apply(ParDo.named("SessionsToStrings").of(new SessionsToStringsDoFn()))
+          .apply("SessionsToStrings", ParDo.of(new SessionsToStringsDoFn()))
           .apply(new TopPerMonth())
-          .apply(ParDo.named("FormatOutput").of(new FormatOutputDoFn()));
+          .apply("FormatOutput", ParDo.of(new FormatOutputDoFn()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
index a669b0c..017be21 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
@@ -189,8 +189,7 @@ public class FilterExamples {
       // that is computed earlier in pipeline execution.
       // We'll only output readings with temperatures below this mean.
       PCollection<TableRow> filteredRows = monthFilteredRows
-          .apply(ParDo
-              .named("ParseAndFilter")
+          .apply("ParseAndFilter", ParDo
               .withSideInputs(globalMeanTemp)
               .of(new DoFn<TableRow, TableRow>() {
                 @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
index e8f1f01..3c26123 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
@@ -98,7 +98,7 @@ public class JoinExamples {
     // Process the CoGbkResult elements generated by the CoGroupByKey transform.
     // country code 'key' -> string of <event info>, <country name>
     PCollection<KV<String, String>> finalResultCollection =
-      kvpCollection.apply(ParDo.named("Process").of(
+      kvpCollection.apply("Process", ParDo.of(
         new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
           @Override
           public void processElement(ProcessContext c) {
@@ -116,7 +116,7 @@ public class JoinExamples {
 
     // write to GCS
     PCollection<String> formattedResults = finalResultCollection
-        .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() {
+        .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() {
           @Override
           public void processElement(ProcessContext c) {
             String outputstring = "Country code: " + c.element().getKey()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index c614550..ab1fb66 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -494,7 +494,7 @@ public class TriggerExample {
     Pipeline injectorPipeline = Pipeline.create(copiedOptions);
     injectorPipeline
     .apply(TextIO.Read.named("ReadMyFile").from(options.getInput()))
-    .apply(ParDo.named("InsertRandomDelays").of(new InsertDelays()))
+    .apply("InsertRandomDelays", ParDo.of(new InsertDelays()))
     .apply(IntraBundleParallelization.of(PubsubFileInjector
         .withTimestampLabelKey(PUBSUB_TIMESTAMP_LABEL_KEY)
         .publish(options.getPubsubTopic()))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index c8bcc8c..ad8b49e 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -123,8 +123,7 @@ public class GameStats extends LeaderBoard {
 
       // Filter the user sums using the global mean.
       PCollection<KV<String, Integer>> filtered = sumScores
-          .apply(ParDo
-              .named("ProcessAndFilter")
+          .apply("ProcessAndFilter", ParDo
               // use the derived mean total score as a side input
               .withSideInputs(globalMeanScore)
               .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
@@ -249,7 +248,7 @@ public class GameStats extends LeaderBoard {
     // Read Events from Pub/Sub using custom timestamps
     PCollection<GameActionInfo> rawEvents = pipeline
         .apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic()))
-        .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()));
+        .apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
 
     // Extract username/score pairs from the event stream
     PCollection<KV<String, Integer>> userEvents =
@@ -284,7 +283,7 @@ public class GameStats extends LeaderBoard {
               Duration.standardMinutes(options.getFixedWindowDuration())))
           )
       // Filter out the detected spammer users, using the side input derived above.
-      .apply(ParDo.named("FilterOutSpammers")
+      .apply("FilterOutSpammers", ParDo
               .withSideInputs(spammersView)
               .of(new DoFn<GameActionInfo, GameActionInfo>() {
                 @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
index c5c2fb5..7a808ac 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
@@ -109,9 +109,11 @@ public class HourlyTeamScore extends UserScore {
     String getStopMin();
     void setStopMin(String value);
 
+    @Override
     @Description("The BigQuery table name. Should not already exist.")
     @Default.String("hourly_team_score")
     String getTableName();
+    @Override
     void setTableName(String value);
   }
 
@@ -155,7 +157,7 @@ public class HourlyTeamScore extends UserScore {
     // Read 'gaming' events from a text file.
     pipeline.apply(TextIO.Read.from(options.getInput()))
       // Parse the incoming data.
-      .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
+      .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
 
       // Filter out data before and after the given times so that it is not included
       // in the calculations. As we collect data in batches (say, by day), the batch for the day

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 12d2729..2c608aa 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -119,9 +119,11 @@ public class LeaderBoard extends HourlyTeamScore {
     Integer getAllowedLateness();
     void setAllowedLateness(Integer value);
 
+    @Override
     @Description("Prefix used for the BigQuery table names")
     @Default.String("leaderboard")
     String getTableName();
+    @Override
     void setTableName(String value);
   }
 
@@ -183,7 +185,7 @@ public class LeaderBoard extends HourlyTeamScore {
     // data elements, and parse the data.
     PCollection<GameActionInfo> gameEvents = pipeline
         .apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic()))
-        .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()));
+        .apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
 
     // [START DocInclude_WindowAndTrigger]
     // Extract team/score pairs from the event stream, using hour-long windows by default.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index 97b6929..28614cb 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -226,7 +226,7 @@ public class UserScore {
 
     // Read events from a text file and parse them.
     pipeline.apply(TextIO.Read.from(options.getInput()))
-      .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
+      .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
       // Extract and sum username/score pairs from the event data.
       .apply("ExtractUserScore", new ExtractAndSumScore("user"))
       .apply("WriteUserScoreSums",

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
index 5897e44..5b472d7 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
@@ -115,7 +115,7 @@ public class WriteToBigQuery<T>
   @Override
   public PDone apply(PCollection<T> teamAndScore) {
     return teamAndScore
-      .apply(ParDo.named("ConvertToRow").of(new BuildRowFn()))
+      .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
       .apply(BigQueryIO.Write
                 .to(getTable(teamAndScore.getPipeline(),
                     tableName))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
index 27697db..b1ccaed 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
@@ -66,7 +66,7 @@ public class WriteWindowedToBigQuery<T>
   @Override
   public PDone apply(PCollection<T> teamAndScore) {
     return teamAndScore
-      .apply(ParDo.named("ConvertToRow").of(new BuildRowFn()))
+      .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
       .apply(BigQueryIO.Write
                 .to(getTable(teamAndScore.getPipeline(),
                     tableName))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
index 4254902..b917b4c 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
@@ -94,7 +94,7 @@ public class HourlyTeamScoreTest implements Serializable {
     PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of()));
 
     PCollection<KV<String, Integer>> output = input
-      .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
+      .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
 
       .apply("FilterStartTime", Filter.by(
           (GameActionInfo gInfo)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index af920aa..084ac7c 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -231,26 +231,25 @@ public class TFIDF {
       // Create a collection of pairs mapping a URI to each
       // of the words in the document associated with that that URI.
       PCollection<KV<URI, String>> uriToWords = uriToContent
-          .apply(ParDo.named("SplitWords").of(
-              new DoFn<KV<URI, String>, KV<URI, String>>() {
-                private static final long serialVersionUID = 0;
+          .apply("SplitWords", ParDo.of(new DoFn<KV<URI, String>, KV<URI, String>>() {
+            private static final long serialVersionUID = 0;
 
-                @Override
-                public void processElement(ProcessContext c) {
-                  URI uri = c.element().getKey();
-                  String line = c.element().getValue();
-                  for (String word : line.split("\\W+")) {
-                    // Log INFO messages when the word \u201clove\u201d is found.
-                    if (word.toLowerCase().equals("love")) {
-                      LOG.info("Found {}", word.toLowerCase());
-                    }
-
-                    if (!word.isEmpty()) {
-                      c.output(KV.of(uri, word.toLowerCase()));
-                    }
-                  }
+            @Override
+            public void processElement(ProcessContext c) {
+              URI uri = c.element().getKey();
+              String line = c.element().getValue();
+              for (String word : line.split("\\W+")) {
+                // Log INFO messages when the word \u201clove\u201d is found.
+                if (word.toLowerCase().equals("love")) {
+                  LOG.info("Found {}", word.toLowerCase());
                 }
-              }));
+
+                if (!word.isEmpty()) {
+                  c.output(KV.of(uri, word.toLowerCase()));
+                }
+              }
+            }
+          }));
 
       // Compute a mapping from each word to the total
       // number of documents in which it appears.
@@ -276,7 +275,7 @@ public class TFIDF {
       // from URI to (word, count) pairs, to prepare for a join
       // by the URI key.
       PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
-          .apply(ParDo.named("ShiftKeys").of(
+          .apply("ShiftKeys", ParDo.of(
               new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
                 private static final long serialVersionUID = 0;
 
@@ -317,7 +316,7 @@ public class TFIDF {
       // is simply the number of times that word occurs in the document
       // divided by the total number of words in the document.
       PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
-          .apply(ParDo.named("ComputeTermFrequencies").of(
+          .apply("ComputeTermFrequencies", ParDo.of(
               new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
                 private static final long serialVersionUID = 0;
 
@@ -343,8 +342,7 @@ public class TFIDF {
       // documents is passed as a side input; the same value is
       // presented to each invocation of the DoFn.
       PCollection<KV<String, Double>> wordToDf = wordToDocCount
-          .apply(ParDo
-              .named("ComputeDocFrequencies")
+          .apply("ComputeDocFrequencies", ParDo
               .withSideInputs(totalDocuments)
               .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
                 private static final long serialVersionUID = 0;
@@ -377,9 +375,9 @@ public class TFIDF {
       // divided by the log of the document frequency.
 
       return wordToUriAndTfAndDf
-          .apply(ParDo.named("ComputeTfIdf").of(
+          .apply("ComputeTfIdf", ParDo.of(
               new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
-                private static final long serialVersionUID1 = 0;
+                private static final long serialVersionUID = 0;
 
                 @Override
                 public void processElement(ProcessContext c) {
@@ -419,7 +417,7 @@ public class TFIDF {
     @Override
     public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
       return wordToUriAndTfIdf
-          .apply(ParDo.named("Format").of(new DoFn<KV<String, KV<URI, Double>>, String>() {
+          .apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() {
             private static final long serialVersionUID = 0;
 
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index 9299955..ed11781 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -92,7 +92,7 @@ public class AutoComplete {
         .apply(new Count.PerElement<String>())
 
         // Map the KV outputs of Count into our own CompletionCandiate class.
-        .apply(ParDo.named("CreateCompletionCandidates").of(
+        .apply("CreateCompletionCandidates", ParDo.of(
             new DoFn<KV<String, Long>, CompletionCandidate>() {
               private static final long serialVersionUID = 0;
 
@@ -395,7 +395,7 @@ public class AutoComplete {
       .apply(ComputeTopCompletions.top(10, options.getRecursive()));
 
     toWrite
-      .apply(ParDo.named("FormatForPerTaskFile").of(new FormatForPerTaskLocalFile()))
+      .apply("FormatForPerTaskFile", ParDo.of(new FormatForPerTaskLocalFile()))
       .apply(TextIO.Write.to("./outputAutoComplete.txt"));
 
     p.run();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
index b447a20..0828ddc 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
@@ -77,7 +77,7 @@ public class JoinExamples {
     // Process the CoGbkResult elements generated by the CoGroupByKey transform.
     // country code 'key' -> string of <event info>, <country name>
     PCollection<KV<String, String>> finalResultCollection =
-        kvpCollection.apply(ParDo.named("Process").of(
+        kvpCollection.apply("Process", ParDo.of(
             new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
               private static final long serialVersionUID = 0;
 
@@ -100,7 +100,7 @@ public class JoinExamples {
             }));
 
     return finalResultCollection
-        .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() {
+        .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() {
           private static final long serialVersionUID = 0;
 
           @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 5818ba5..d47d285 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -179,6 +179,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+
 import javax.annotation.Nullable;
 
 /**
@@ -2540,7 +2541,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
             .apply(new Deduplicate<T>());
       } else {
         return Pipeline.applyTransform(input, new ReadWithIds<T>(source))
-            .apply(ValueWithRecordId.<T>stripIds());
+            .apply("StripIds", ParDo.of(new ValueWithRecordId.StripIdsDoFn<T>()));
       }
     }
 
@@ -2613,7 +2614,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
           // Reshuffle will dedup based on ids in ValueWithRecordId by passing the data through
           // WindmillSink.
           .apply(Reshuffle.<Integer, ValueWithRecordId<T>>of())
-          .apply(ParDo.named("StripIds").of(
+          .apply("StripIds", ParDo.of(
               new DoFn<KV<Integer, ValueWithRecordId<T>>, T>() {
                 @Override
                 public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
index 1b18c2a..5f808a5 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
@@ -63,7 +63,7 @@ public class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>>
     } else {
       // If the windowFn didn't change, we just run a pass-through transform and then set the
       // new windowing strategy.
-      return input.apply(ParDo.named("Identity").of(new DoFn<T, T>() {
+      return input.apply("Identity", ParDo.of(new DoFn<T, T>() {
         @Override
         public void processElement(DoFn<T, T>.ProcessContext c) throws Exception {
           c.output(c.element());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 261ba99..c3a6a11 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -513,9 +513,9 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
     Pipeline pipeline = Pipeline.create(options);
     String stepName = "DoFn1";
-    pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in"))
-        .apply(ParDo.of(new NoOpFn()).named(stepName))
-        .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/out"));
+    pipeline.apply("ReadMyFile", TextIO.Read.from("gs://bucket/in"))
+        .apply(stepName, ParDo.of(new NoOpFn()))
+        .apply("WriteMyFile", TextIO.Write.to("gs://bucket/out"));
     Job job =
         translator
             .translate(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index ba13f9d..cfdd581 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.RemoveDuplicates;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -96,7 +97,7 @@ class BoundedReadFromUnboundedSource<T> extends PTransform<PInput, PCollection<T
             }
           }));
     }
-    return read.apply(ValueWithRecordId.<T>stripIds());
+    return read.apply("StripIds", ParDo.of(new ValueWithRecordId.StripIdsDoFn<T>()));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index a165c91..78758a2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -420,22 +420,20 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
 
   @Override
   public PDone apply(PCollection<T> input) {
-    input.apply(
-        Window.named("PubsubUnboundedSink.Window")
-            .<T>into(new GlobalWindows())
-            .triggering(
-                Repeatedly.forever(
-                    AfterFirst.of(AfterPane.elementCountAtLeast(publishBatchSize),
-                                  AfterProcessingTime.pastFirstElementInPane()
-                                                     .plusDelayOf(maxLatency))))
+    input.apply("PubsubUnboundedSink.Window", Window.<T>into(new GlobalWindows())
+        .triggering(
+            Repeatedly.forever(
+                AfterFirst.of(AfterPane.elementCountAtLeast(publishBatchSize),
+                    AfterProcessingTime.pastFirstElementInPane()
+                    .plusDelayOf(maxLatency))))
             .discardingFiredPanes())
-         .apply(ParDo.named("PubsubUnboundedSink.Shard")
-                     .of(new ShardFn<T>(elementCoder, numShards, recordIdMethod)))
+         .apply("PubsubUnboundedSink.Shard",
+             ParDo.of(new ShardFn<T>(elementCoder, numShards, recordIdMethod)))
          .setCoder(KvCoder.of(VarIntCoder.of(), CODER))
          .apply(GroupByKey.<Integer, OutgoingMessage>create())
-         .apply(ParDo.named("PubsubUnboundedSink.Writer")
-                     .of(new WriterFn(pubsubFactory, topic, timestampLabel, idLabel,
-                                      publishBatchSize, publishBatchBytes)));
+         .apply("PubsubUnboundedSink.Writer",
+             ParDo.of(new WriterFn(pubsubFactory, topic, timestampLabel, idLabel,
+                 publishBatchSize, publishBatchBytes)));
     return PDone.in(input.getPipeline());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index e7634ec..07d355e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -1296,8 +1296,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
 
     return input.getPipeline().begin()
                 .apply(Read.from(new PubsubSource<T>(this)))
-                .apply(ParDo.named("PubsubUnboundedSource.Stats")
-                            .of(new StatsFn<T>(pubsubFactory, subscription,
-                                               timestampLabel, idLabel)));
+                .apply("PubsubUnboundedSource.Stats",
+                    ParDo.of(new StatsFn<T>(pubsubFactory, subscription, timestampLabel, idLabel)));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index a29a56d..1a3d85d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -718,10 +718,8 @@ public class PAssert {
 
       input
           .apply(Create.of(0).withCoder(VarIntCoder.of()))
-          .apply(
-              ParDo.named("RunChecks")
-                  .withSideInputs(actual)
-                  .of(new SideInputCheckerDoFn<>(checkerFn, actual)));
+          .apply("RunChecks",
+              ParDo.withSideInputs(actual).of(new SideInputCheckerDoFn<>(checkerFn, actual)));
 
       return PDone.in(input.getPipeline());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 20c1242..7871672 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.AbstractPerKeyCombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
@@ -1400,7 +1401,7 @@ public class Combine {
       final OutputT defaultValue = fn.defaultValue();
       PCollection<OutputT> defaultIfEmpty = maybeEmpty.getPipeline()
           .apply("CreateVoid", Create.of((Void) null).withCoder(VoidCoder.of()))
-          .apply(ParDo.named("ProduceDefault").withSideInputs(maybeEmptyView).of(
+          .apply("ProduceDefault", ParDo.withSideInputs(maybeEmptyView).of(
               new DoFn<Void, OutputT>() {
                 @Override
                 public void processElement(DoFn<Void, OutputT>.ProcessContext c) {
@@ -2024,28 +2025,27 @@ public class Combine {
       // augmenting the hot keys with a nonce.
       final TupleTag<KV<KV<K, Integer>, InputT>> hot = new TupleTag<>();
       final TupleTag<KV<K, InputT>> cold = new TupleTag<>();
-      PCollectionTuple split = input.apply(
-          ParDo.named("AddNonce").of(
-              new DoFn<KV<K, InputT>, KV<K, InputT>>() {
-                transient int counter;
-                @Override
-                public void startBundle(Context c) {
-                  counter = ThreadLocalRandom.current().nextInt(
-                      Integer.MAX_VALUE);
-                }
+      PCollectionTuple split = input.apply("AddNonce", ParDo.of(
+          new DoFn<KV<K, InputT>, KV<K, InputT>>() {
+            transient int counter;
+            @Override
+            public void startBundle(Context c) {
+              counter = ThreadLocalRandom.current().nextInt(
+                  Integer.MAX_VALUE);
+            }
 
-                @Override
-                public void processElement(ProcessContext c) {
-                  KV<K, InputT> kv = c.element();
-                  int spread = Math.max(1, hotKeyFanout.apply(kv.getKey()));
-                  if (spread <= 1) {
-                    c.output(kv);
-                  } else {
-                    int nonce = counter++ % spread;
-                    c.sideOutput(hot, KV.of(KV.of(kv.getKey(), nonce), kv.getValue()));
-                  }
-                }
-              })
+            @Override
+            public void processElement(ProcessContext c) {
+              KV<K, InputT> kv = c.element();
+              int spread = Math.max(1, hotKeyFanout.apply(kv.getKey()));
+              if (spread <= 1) {
+                c.output(kv);
+              } else {
+                int nonce = counter++ % spread;
+                c.sideOutput(hot, KV.of(KV.of(kv.getKey(), nonce), kv.getValue()));
+              }
+            }
+          })
           .withOutputTags(cold, TupleTagList.of(hot)));
 
       // The first level of combine should never use accumulating mode.
@@ -2063,7 +2063,7 @@ public class Combine {
                                inputCoder.getValueCoder()))
           .setWindowingStrategyInternal(preCombineStrategy)
           .apply("PreCombineHot", Combine.perKey(hotPreCombine))
-          .apply(ParDo.named("StripNonce").of(
+          .apply("StripNonce", ParDo.of(
               new DoFn<KV<KV<K, Integer>, AccumT>,
                        KV<K, InputOrAccum<InputT, AccumT>>>() {
                 @Override
@@ -2079,7 +2079,7 @@ public class Combine {
       PCollection<KV<K, InputOrAccum<InputT, AccumT>>> preprocessedCold = split
           .get(cold)
           .setCoder(inputCoder)
-          .apply(ParDo.named("PrepareCold").of(
+          .apply("PrepareCold", ParDo.of(
               new DoFn<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>() {
                 @Override
                 public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index 3bf264e..3a0fb5d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -107,7 +107,7 @@ public class Count {
     public PCollection<KV<T, Long>> apply(PCollection<T> input) {
       return
           input
-          .apply(ParDo.named("Init").of(new DoFn<T, KV<T, Void>>() {
+          .apply("Init", ParDo.of(new DoFn<T, KV<T, Void>>() {
             @Override
             public void processElement(ProcessContext c) {
               c.output(KV.of(c.element(), (Void) null));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index 932643f..4f270a7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -133,7 +133,7 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
 
   @Override
   public PCollection<OutputT> apply(PCollection<InputT> input) {
-    return input.apply(ParDo.named("Map").of(new DoFn<InputT, OutputT>() {
+    return input.apply("Map", ParDo.of(new DoFn<InputT, OutputT>() {
       private static final long serialVersionUID = 0L;
       @Override
       public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index 93917f3..0b83fb6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -173,7 +173,7 @@ public class Flatten {
       @SuppressWarnings("unchecked")
       Coder<T> elemCoder = ((IterableLikeCoder<T, ?>) inCoder).getElemCoder();
 
-      return in.apply(ParDo.named("FlattenIterables").of(
+      return in.apply("FlattenIterables", ParDo.of(
           new DoFn<Iterable<T>, T>() {
             @Override
             public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
index a1e5e0a..636e306 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
@@ -58,12 +58,11 @@ public class Keys<K> extends PTransform<PCollection<? extends KV<K, ?>>,
   @Override
   public PCollection<K> apply(PCollection<? extends KV<K, ?>> in) {
     return
-        in.apply(ParDo.named("Keys")
-                 .of(new DoFn<KV<K, ?>, K>() {
-                     @Override
-                     public void processElement(ProcessContext c) {
-                       c.output(c.element().getKey());
-                     }
-                    }));
+        in.apply("Keys", ParDo.of(new DoFn<KV<K, ?>, K>() {
+          @Override
+          public void processElement(ProcessContext c) {
+            c.output(c.element().getKey());
+          }
+        }));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
index 2dfc7c1..9597c92 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
@@ -62,13 +62,12 @@ public class KvSwap<K, V> extends PTransform<PCollection<KV<K, V>>,
   @Override
   public PCollection<KV<V, K>> apply(PCollection<KV<K, V>> in) {
     return
-        in.apply(ParDo.named("KvSwap")
-                 .of(new DoFn<KV<K, V>, KV<V, K>>() {
-                     @Override
-                     public void processElement(ProcessContext c) {
-                       KV<K, V> e = c.element();
-                       c.output(KV.of(e.getValue(), e.getKey()));
-                     }
-                    }));
+        in.apply("KvSwap", ParDo.of(new DoFn<KV<K, V>, KV<V, K>>() {
+          @Override
+          public void processElement(ProcessContext c) {
+            KV<K, V> e = c.element();
+            c.output(KV.of(e.getValue(), e.getKey()));
+          }
+        }));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index d64bad1..f535111 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -104,7 +104,7 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
 
   @Override
   public PCollection<OutputT> apply(PCollection<InputT> input) {
-    return input.apply(ParDo.named("Map").of(new DoFn<InputT, OutputT>() {
+    return input.apply("Map", ParDo.of(new DoFn<InputT, OutputT>() {
       @Override
       public void processElement(ProcessContext c) {
         c.output(fn.apply(c.element()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
index 4bcfb29..a56eefc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
@@ -91,7 +91,7 @@ import java.io.Serializable;
  *
  * <pre> {@code
  * ...
- * .apply(ParDo.named("Step1").of(new MyDoFn3()))
+ * .apply("Step1", ParDo.of(new MyDoFn3()))
  * ...
  * } </pre>
  *
@@ -218,9 +218,8 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
   // understand why all of its instance state is transient.
 
   /**
-   * The base name of this {@code PTransform}, e.g., from
-   * {@link ParDo#named(String)}, or from defaults, or {@code null} if not
-   * yet assigned.
+   * The base name of this {@code PTransform}, e.g., from defaults, or
+   * {@code null} if not yet assigned.
    */
   protected final transient String name;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
index 84b1f80..b82744d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
@@ -85,13 +85,12 @@ public class RemoveDuplicates<T> extends PTransform<PCollection<T>,
   @Override
   public PCollection<T> apply(PCollection<T> in) {
     return in
-        .apply(ParDo.named("CreateIndex")
-            .of(new DoFn<T, KV<T, Void>>() {
-                  @Override
-                  public void processElement(ProcessContext c) {
-                    c.output(KV.of(c.element(), (Void) null));
-                  }
-                }))
+        .apply("CreateIndex", ParDo.of(new DoFn<T, KV<T, Void>>() {
+          @Override
+          public void processElement(ProcessContext c) {
+            c.output(KV.of(c.element(), (Void) null));
+          }
+        }))
         .apply(Combine.<T, Void>perKey(
             new SerializableFunction<Iterable<Void>, Void>() {
               @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
index 0b533b8..b3d38b9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
@@ -58,12 +58,11 @@ public class Values<V> extends PTransform<PCollection<? extends KV<?, V>>,
   @Override
   public PCollection<V> apply(PCollection<? extends KV<?, V>> in) {
     return
-        in.apply(ParDo.named("Values")
-                 .of(new DoFn<KV<?, V>, V>() {
-                     @Override
-                     public void processElement(ProcessContext c) {
-                       c.output(c.element().getValue());
-                     }
-                    }));
+        in.apply("Values", ParDo.of(new DoFn<KV<?, V>, V>() {
+          @Override
+          public void processElement(ProcessContext c) {
+            c.output(c.element().getValue());
+          }
+        }));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
index 198e7cb..25116d8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
@@ -113,14 +113,13 @@ public class WithKeys<K, V> extends PTransform<PCollection<V>,
   @Override
   public PCollection<KV<K, V>> apply(PCollection<V> in) {
     PCollection<KV<K, V>> result =
-        in.apply(ParDo.named("AddKeys")
-                 .of(new DoFn<V, KV<K, V>>() {
-                     @Override
-                     public void processElement(ProcessContext c) {
-                       c.output(KV.of(fn.apply(c.element()),
-                                    c.element()));
-                     }
-                    }));
+        in.apply("AddKeys", ParDo.of(new DoFn<V, KV<K, V>>() {
+          @Override
+          public void processElement(ProcessContext c) {
+            c.output(KV.of(fn.apply(c.element()),
+                c.element()));
+          }
+        }));
 
     try {
       Coder<K> keyCoder;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
index eae8de5..ef4b269 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
@@ -101,7 +101,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
   @Override
   public PCollection<T> apply(PCollection<T> input) {
     return input
-        .apply(ParDo.named("AddTimestamps").of(new AddTimestampsDoFn<T>(fn, allowedTimestampSkew)))
+        .apply("AddTimestamps", ParDo.of(new AddTimestampsDoFn<T>(fn, allowedTimestampSkew)))
         .setTypeDescriptorInternal(input.getTypeDescriptor());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
index d7ac5e4..ba4a4a7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
@@ -128,9 +128,8 @@ public class CoGroupByKey<K> extends
         flattenedTable.apply(GroupByKey.<K, RawUnionValue>create());
 
     CoGbkResultSchema tupleTags = input.getCoGbkResultSchema();
-    PCollection<KV<K, CoGbkResult>> result = groupedTable.apply(
-        ParDo.of(new ConstructCoGbkResultFn<K>(tupleTags))
-          .named("ConstructCoGbkResultFn"));
+    PCollection<KV<K, CoGbkResult>> result = groupedTable.apply("ConstructCoGbkResultFn",
+        ParDo.of(new ConstructCoGbkResultFn<K>(tupleTags)));
     result.setCoder(KvCoder.of(keyCoder,
         CoGbkResultCoder.of(tupleTags, unionCoder)));
 
@@ -163,9 +162,8 @@ public class CoGroupByKey<K> extends
       PCollection<KV<K, V>> pCollection,
       KvCoder<K, RawUnionValue> unionTableEncoder) {
 
-    return pCollection.apply(ParDo.of(
-        new ConstructUnionTableFn<K, V>(index)).named("MakeUnionTable" + index))
-                                               .setCoder(unionTableEncoder);
+    return pCollection.apply("MakeUnionTable" + index,
+        ParDo.of(new ConstructUnionTableFn<K, V>(index))).setCoder(unionTableEncoder);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 324b4d5..7d790d4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -645,16 +645,6 @@ public class Window {
     }
   }
 
-  /////////////////////////////////////////////////////////////////////////////
-
-  private static <T> PTransform<PCollection<? extends T>, PCollection<T>> identity() {
-    return ParDo.named("Identity").of(new DoFn<T, T>() {
-      @Override public void processElement(ProcessContext c) {
-        c.output(c.element());
-      }
-    });
-  }
-
   /**
    * Creates a {@code Window} {@code PTransform} that does not change assigned
    * windows, but will cause windows to be merged again as part of the next
@@ -675,7 +665,16 @@ public class Window {
       WindowingStrategy<?, ?> outputWindowingStrategy = getOutputWindowing(
           input.getWindowingStrategy());
 
-      return input.apply(Window.<T>identity())
+      return input
+          // We first apply a (trivial) transform to the input PCollection to produce a new
+          // PCollection. This ensures that we don't modify the windowing strategy of the input
+          // which may be used elsewhere.
+          .apply("Identity", ParDo.of(new DoFn<T, T>() {
+            @Override public void processElement(ProcessContext c) {
+              c.output(c.element());
+            }
+          }))
+          // Then we modify the windowing strategy.
           .setWindowingStrategyInternal(outputWindowingStrategy);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index c0d159b..6c58689 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+
 import org.joda.time.Duration;
 
 /**
@@ -68,7 +69,7 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti
         // Set the windowing strategy directly, so that it doesn't get counted as the user having
         // set allowed lateness.
         .setWindowingStrategyInternal(originalStrategy)
-        .apply(ParDo.named("ExpandIterable").of(
+        .apply("ExpandIterable", ParDo.of(
             new DoFn<KV<K, Iterable<V>>, KV<K, V>>() {
               @Override
               public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java
index 8e4e134..80dfcae 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java
@@ -21,9 +21,6 @@ import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
@@ -142,15 +139,11 @@ public class ValueWithRecordId<ValueT> {
     ByteArrayCoder idCoder;
   }
 
-  public static <T>
-      PTransform<PCollection<? extends ValueWithRecordId<T>>, PCollection<T>> stripIds() {
-    return ParDo.named("StripIds")
-        .of(
-            new DoFn<ValueWithRecordId<T>, T>() {
-              @Override
-              public void processElement(ProcessContext c) {
-                c.output(c.element().getValue());
-              }
-            });
+  /** {@link DoFn} to turn a {@code ValueWithRecordId<T>} back to the value {@code T}. */
+  public static class StripIdsDoFn<T> extends DoFn<ValueWithRecordId<T>, T> {
+    @Override
+    public void processElement(ProcessContext c) {
+      c.output(c.element().getValue());
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 03ecf6f..db32fa6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -24,6 +24,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.include
 import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
 import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
 import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
+
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.equalTo;
@@ -815,37 +816,22 @@ public class ParDoTest implements Serializable {
         .setName("MyInput");
 
     {
-      PCollection<String> output1 =
-          input
-          .apply(ParDo.of(new TestDoFn()));
+      PCollection<String> output1 = input.apply(ParDo.of(new TestDoFn()));
       assertEquals("ParDo(Test).out", output1.getName());
     }
 
     {
-      PCollection<String> output2 =
-          input
-          .apply(ParDo.named("MyParDo").of(new TestDoFn()));
+      PCollection<String> output2 = input.apply("MyParDo", ParDo.of(new TestDoFn()));
       assertEquals("MyParDo.out", output2.getName());
     }
 
     {
-      PCollection<String> output3 =
-          input
-          .apply(ParDo.of(new TestDoFn()).named("HerParDo"));
-      assertEquals("HerParDo.out", output3.getName());
-    }
-
-    {
-      PCollection<String> output4 =
-          input
-              .apply(ParDo.of(new TestDoFn()).named("TestDoFn"));
+      PCollection<String> output4 = input.apply("TestDoFn", ParDo.of(new TestDoFn()));
       assertEquals("TestDoFn.out", output4.getName());
     }
 
     {
-      PCollection<String> output5 =
-          input
-              .apply(ParDo.of(new StrangelyNamedDoer()));
+      PCollection<String> output5 = input.apply(ParDo.of(new StrangelyNamedDoer()));
       assertEquals("ParDo(StrangelyNamedDoer).out",
           output5.getName());
     }
@@ -869,8 +855,7 @@ public class ParDoTest implements Serializable {
 
     PCollectionTuple outputs = p
         .apply(Create.of(Arrays.asList(3, -42, 666))).setName("MyInput")
-        .apply(ParDo
-               .named("MyParDo")
+        .apply("MyParDo", ParDo
                .of(new TestDoFn(
                    Arrays.<PCollectionView<Integer>>asList(),
                    Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3)))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index 5377f23..21f58df 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -52,7 +52,6 @@ import java.io.Serializable;
 
 /** Unit tests for bucketing. */
 @RunWith(JUnit4.class)
-@SuppressWarnings("unchecked")
 public class WindowingTest implements Serializable {
   @Rule
   public transient TemporaryFolder tmpFolder = new TemporaryFolder();
@@ -73,12 +72,11 @@ public class WindowingTest implements Serializable {
     }
     @Override
     public PCollection<String> apply(PCollection<String> in) {
-      return in.apply(
-              Window.named("Window")
-                  .<String>into(windowFn)
+      return in.apply("Window",
+              Window.<String>into(windowFn)
                   .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))
           .apply(Count.<String>perElement())
-          .apply(ParDo.named("FormatCounts").of(new FormatCountsDoFn()))
+          .apply("FormatCounts", ParDo.of(new FormatCountsDoFn()))
           .setCoder(StringUtf8Coder.of());
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
index 98af2e7..be32afa 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
@@ -82,7 +82,7 @@ public class MinimalWordCount {
      // DoFn (defined in-line) on each element that tokenizes the text line into individual words.
      // The ParDo returns a PCollection<String>, where each element is an individual word in
      // Shakespeare's collected texts.
-     .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
+     .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
                        @Override
                        public void processElement(ProcessContext c) {
                          for (String word : c.element().split("[^a-zA-Z']+")) {
@@ -98,7 +98,7 @@ public class MinimalWordCount {
      .apply(Count.<String>perElement())
      // Apply another ParDo transform that formats our PCollection of word counts into a printable
      // string, suitable for writing to an output file.
-     .apply(ParDo.named("FormatResults").of(new DoFn<KV<String, Long>, String>() {
+     .apply("FormatResults", ParDo.of(new DoFn<KV<String, Long>, String>() {
                        @Override
                        public void processElement(ProcessContext c) {
                          c.output(c.element().getKey() + ": " + c.element().getValue());