You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/06/27 17:31:07 UTC

[1/3] incubator-beam git commit: Remove many definitions of named methods

Repository: incubator-beam
Updated Branches:
  refs/heads/master 4f580f5f1 -> 9abd0926a


Remove many definitions of named methods

Specifically, remove the occurrences in:
  - Window
  - AvroIO
  - PubsubIO
  - TextIO
  - BigQueryIO
  - Read


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fc52a102
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fc52a102
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fc52a102

Branch: refs/heads/master
Commit: fc52a10259cd045f4b55ec59b2ae87c02c926ed4
Parents: 5719535
Author: Ben Chambers <bc...@google.com>
Authored: Thu Jun 23 17:55:24 2016 -0700
Committer: Ben Chambers <bc...@google.com>
Committed: Sun Jun 26 10:06:35 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 53 ++++---------------
 .../java/org/apache/beam/sdk/io/BigQueryIO.java | 42 +--------------
 .../java/org/apache/beam/sdk/io/PubsubIO.java   | 35 +------------
 .../main/java/org/apache/beam/sdk/io/Read.java  | 29 +----------
 .../java/org/apache/beam/sdk/io/TextIO.java     | 55 ++++----------------
 .../org/apache/beam/sdk/io/package-info.java    |  6 +--
 .../beam/sdk/transforms/windowing/Window.java   | 42 ---------------
 7 files changed, 25 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc52a102/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 4b40c01..604051b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -55,9 +55,7 @@ import javax.annotation.Nullable;
  * {@link AvroIO.Read}, specifying {@link AvroIO.Read#from} to specify
  * the path of the file(s) to read from (e.g., a local filename or
  * filename pattern if running locally, or a Google Cloud Storage
- * filename or filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}), and optionally
- * {@link AvroIO.Read#named} to specify the name of the pipeline step.
+ * filename or filename pattern of the form {@code "gs://<bucket>/<filepath>"}).
  *
  * <p>It is required to specify {@link AvroIO.Read#withSchema}. To
  * read specific records, such as Avro-generated classes, provide an
@@ -73,15 +71,15 @@ import javax.annotation.Nullable;
  * // A simple Read of a local file (only runs locally):
  * PCollection<AvroAutoGenClass> records =
  *     p.apply(AvroIO.Read.from("/path/to/file.avro")
- *                        .withSchema(AvroAutoGenClass.class));
+ *                 .withSchema(AvroAutoGenClass.class));
  *
  * // A Read from a GCS file (runs locally and via the Google Cloud
  * // Dataflow service):
  * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
  * PCollection<GenericRecord> records =
- *     p.apply(AvroIO.Read.named("ReadFromAvro")
- *                        .from("gs://my_bucket/path/to/records-*.avro")
- *                        .withSchema(schema));
+ *     p.apply(AvroIO.Read
+ *                .from("gs://my_bucket/path/to/records-*.avro")
+ *                .withSchema(schema));
  * } </pre>
  *
  * <p>To write a {@link PCollection} to one or more Avro files, use
@@ -110,10 +108,10 @@ import javax.annotation.Nullable;
  * // Dataflow service):
  * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
  * PCollection<GenericRecord> records = ...;
- * records.apply(AvroIO.Write.named("WriteToAvro")
- *                           .to("gs://my_bucket/path/to/numbers")
- *                           .withSchema(schema)
- *                           .withSuffix(".avro"));
+ * records.apply("WriteToAvro", AvroIO.Write
+ *     .to("gs://my_bucket/path/to/numbers")
+ *     .withSchema(schema)
+ *     .withSuffix(".avro"));
  * } </pre>
  *
  * <p><h3>Permissions</h3>
@@ -128,12 +126,6 @@ public class AvroIO {
    * the decoding of each record.
    */
   public static class Read {
-    /**
-     * Returns a {@link PTransform} with the given step name.
-     */
-    public static Bound<GenericRecord> named(String name) {
-      return new Bound<>(GenericRecord.class).named(name);
-    }
 
     /**
      * Returns a {@link PTransform} that reads from the file(s)
@@ -223,16 +215,6 @@ public class AvroIO {
 
       /**
        * Returns a new {@link PTransform} that's like this one but
-       * with the given step name.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> named(String name) {
-        return new Bound<>(name, filepattern, type, schema, validate);
-      }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
        * that reads from the file(s) with the given name or pattern.
        * (See {@link AvroIO.Read#from} for a description of
        * filepatterns.)
@@ -366,12 +348,6 @@ public class AvroIO {
    * multiple Avro files matching a sharding pattern).
    */
   public static class Write {
-    /**
-     * Returns a {@link PTransform} with the given step name.
-     */
-    public static Bound<GenericRecord> named(String name) {
-      return new Bound<>(GenericRecord.class).named(name);
-    }
 
     /**
      * Returns a {@link PTransform} that writes to the file(s)
@@ -522,17 +498,6 @@ public class AvroIO {
 
       /**
        * Returns a new {@link PTransform} that's like this one but
-       * with the given step name.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> named(String name) {
-        return new Bound<>(
-            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, validate);
-      }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
        * that writes to the file(s) with the given filename prefix.
        *
        * <p>See {@link AvroIO.Write#to(String)} for more information

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc52a102/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index 7cac705..a9d85b8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -162,8 +162,7 @@ import javax.annotation.Nullable;
  * This produces a {@link PCollection} of {@link TableRow TableRows} as output:
  * <pre>{@code
  * PCollection<TableRow> shakespeare = pipeline.apply(
- *     BigQueryIO.Read.named("Read")
- *                    .from("clouddataflow-readonly:samples.weather_stations"));
+ *     BigQueryIO.Read.from("clouddataflow-readonly:samples.weather_stations"));
  * }</pre>
  *
  * <p>See {@link TableRow} for more information on the {@link TableRow} object.
@@ -174,8 +173,7 @@ import javax.annotation.Nullable;
  *
  * <pre>{@code
  * PCollection<TableRow> shakespeare = pipeline.apply(
- *     BigQueryIO.Read.named("Read")
- *                    .fromQuery("SELECT year, mean_temp FROM samples.weather_stations"));
+ *     BigQueryIO.Read.fromQuery("SELECT year, mean_temp FROM samples.weather_stations"));
  * }</pre>
  *
  * <p>When creating a BigQuery input transform, users should provide either a query or a table.
@@ -193,7 +191,6 @@ import javax.annotation.Nullable;
  * TableSchema schema = new TableSchema().setFields(fields);
  *
  * quotes.apply(BigQueryIO.Write
- *     .named("Write")
  *     .to("my-project:output.output_table")
  *     .withSchema(schema)
  *     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
@@ -214,7 +211,6 @@ import javax.annotation.Nullable;
  * PCollection<TableRow> quotes = ...
  * quotes.apply(Window.<TableRow>into(CalendarWindows.days(1)))
  *       .apply(BigQueryIO.Write
- *         .named("Write")
  *         .withSchema(schema)
  *         .to(new SerializableFunction<BoundedWindow, String>() {
  *           public String apply(BoundedWindow window) {
@@ -345,13 +341,6 @@ public class BigQueryIO {
    * }}</pre>
    */
   public static class Read {
-    /**
-     * Returns a {@link Read.Bound} with the given name. The BigQuery table or query to be read
-     * from has not yet been configured.
-     */
-    public static Bound named(String name) {
-      return new Bound().named(name);
-    }
 
     /**
      * Reads a BigQuery table specified as {@code "[project_id]:[dataset_id].[table_id]"} or
@@ -429,15 +418,6 @@ public class BigQueryIO {
       }
 
       /**
-       * Returns a copy of this transform using the name associated with this transformation.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound named(String name) {
-        return new Bound(name, query, jsonTableRef, validate, flattenResults, testBigQueryServices);
-      }
-
-      /**
        * Returns a copy of this transform that reads from the specified table. Refer to
        * {@link #parseTableSpec(String)} for the specification format.
        *
@@ -1372,14 +1352,6 @@ public class BigQueryIO {
     }
 
     /**
-     * Creates a write transformation with the given transform name. The BigQuery table to be
-     * written has not yet been configured.
-     */
-    public static Bound named(String name) {
-      return new Bound().named(name);
-    }
-
-    /**
      * Creates a write transformation for the given table specification.
      *
      * <p>Refer to {@link #parseTableSpec(String)} for the specification format.
@@ -1522,16 +1494,6 @@ public class BigQueryIO {
       }
 
       /**
-       * Returns a copy of this write transformation, but with the specified transform name.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound named(String name) {
-        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, validate, testBigQueryServices);
-      }
-
-      /**
        * Returns a copy of this write transformation, but writing to the specified table. Refer to
        * {@link #parseTableSpec(String)} for the specification format.
        *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc52a102/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index c6de8b4..ecb1f0a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -55,6 +55,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+
 import javax.annotation.Nullable;
 
 /**
@@ -369,13 +370,6 @@ public class PubsubIO {
    * {@link Bound#maxNumRecords(int)} or {@link Bound#maxReadTime(Duration)} must be set.
    */
   public static class Read {
-    /**
-     * Creates and returns a transform for reading from Cloud Pub/Sub with the specified transform
-     * name.
-     */
-    public static Bound<String> named(String name) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).named(name);
-    }
 
     /**
      * Creates and returns a transform for reading from a Cloud Pub/Sub topic. Mutually exclusive
@@ -531,16 +525,6 @@ public class PubsubIO {
       }
 
       /**
-       * Returns a transform that's like this one but with the given step name.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> named(String name) {
-        return new Bound<>(
-            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
-      }
-
-      /**
        * Returns a transform that's like this one but reading from the
        * given subscription.
        *
@@ -834,13 +818,6 @@ public class PubsubIO {
   // TODO: Support non-String encodings.
   public static class Write {
     /**
-     * Creates a transform that writes to Pub/Sub with the given step name.
-     */
-    public static Bound<String> named(String name) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).named(name);
-    }
-
-    /**
      * Creates a transform that publishes to the specified topic.
      *
      * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
@@ -917,16 +894,6 @@ public class PubsubIO {
       }
 
       /**
-       * Returns a new transform that's like this one but with the specified step
-       * name.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> named(String name) {
-        return new Bound<>(name, topic, timestampLabel, idLabel, coder);
-      }
-
-      /**
        * Returns a new transform that's like this one but that writes to the specified
        * topic.
        *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc52a102/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index c0440f2..e13ff06 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -38,17 +38,10 @@ import javax.annotation.Nullable;
  * <p>Usage example:
  * <pre>
  * Pipeline p = Pipeline.create();
- * p.apply(Read.from(new MySource().withFoo("foo").withBar("bar"))
- *             .named("foobar"));
+ * p.apply(Read.from(new MySource().withFoo("foo").withBar("bar")));
  * </pre>
  */
 public class Read {
-  /**
-   * Returns a new {@code Read} {@code PTransform} builder with the given name.
-   */
-  public static Builder named(String name) {
-    return new Builder(name);
-  }
 
   /**
    * Returns a new {@code Read.Bounded} {@code PTransform} reading from the given
@@ -104,16 +97,6 @@ public class Read {
       this.source = SerializableUtils.ensureSerializable(source);
     }
 
-    /**
-     * Returns a new {@code Bounded} {@code PTransform} that's like this one but
-     * has the given name.
-     *
-     * <p>Does not modify this object.
-     */
-    public Bounded<T> named(String name) {
-      return new Bounded<T>(name, source);
-    }
-
     @Override
     protected Coder<T> getDefaultOutputCoder() {
       return source.getDefaultOutputCoder();
@@ -162,16 +145,6 @@ public class Read {
     }
 
     /**
-     * Returns a new {@code Unbounded} {@code PTransform} that's like this one but
-     * has the given name.
-     *
-     * <p>Does not modify this object.
-     */
-    public Unbounded<T> named(String name) {
-      return new Unbounded<T>(name, source);
-    }
-
-    /**
      * Returns a new {@link BoundedReadFromUnboundedSource} that reads a bounded amount
      * of data from the given {@link UnboundedSource}.  The bound is specified as a number
      * of records to read.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc52a102/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index a7e5e29..7e7a3e6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -58,8 +58,7 @@ import javax.annotation.Nullable;
  * the path of the file(s) to read from (e.g., a local filename or
  * filename pattern if running locally, or a Google Cloud Storage
  * filename or filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}). You may optionally call
- * {@link TextIO.Read#named(String)} to specify the name of the pipeline step.
+ * {@code "gs://<bucket>/<filepath>"}).
  *
  * <p>By default, {@link TextIO.Read} returns a {@link PCollection} of {@link String Strings},
  * each corresponding to one line of an input UTF-8 text file. To convert directly from the raw
@@ -78,9 +77,9 @@ import javax.annotation.Nullable;
  * // A fully-specified Read from a GCS file (runs locally and via the
  * // Google Cloud Dataflow service):
  * PCollection<Integer> numbers =
- *     p.apply(TextIO.Read.named("ReadNumbers")
- *                        .from("gs://my_bucket/path/to/numbers-*.txt")
- *                        .withCoder(TextualIntegerCoder.of()));
+ *     p.apply("ReadNumbers", TextIO.Read
+ *         .from("gs://my_bucket/path/to/numbers-*.txt")
+ *         .withCoder(TextualIntegerCoder.of()));
  * }</pre>
  *
  * <p>To write a {@link PCollection} to one or more text files, use
@@ -88,9 +87,8 @@ import javax.annotation.Nullable;
  * the path of the file to write to (e.g., a local filename or sharded
  * filename pattern if running locally, or a Google Cloud Storage
  * filename or sharded filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}). You can optionally name the resulting transform using
- * {@link TextIO.Write#named(String)}, and you can use {@link TextIO.Write#withCoder(Coder)}
- * to specify the Coder to use to encode the Java values into text lines.
+ * {@code "gs://<bucket>/<filepath>"}). You can use {@link TextIO.Write#withCoder(Coder)}
+ * to specify the {@link Coder} to use to encode the Java values into text lines.
  *
  * <p>Any existing files with the same names as generated output files
  * will be overwritten.
@@ -104,10 +102,10 @@ import javax.annotation.Nullable;
  * // A fully-specified Write to a sharded GCS file (runs locally and via the
  * // Google Cloud Dataflow service):
  * PCollection<Integer> numbers = ...;
- * numbers.apply(TextIO.Write.named("WriteNumbers")
- *                           .to("gs://my_bucket/path/to/numbers")
- *                           .withSuffix(".txt")
- *                           .withCoder(TextualIntegerCoder.of()));
+ * numbers.apply("WriteNumbers", TextIO.Write
+ *      .to("gs://my_bucket/path/to/numbers")
+ *      .withSuffix(".txt")
+ *      .withCoder(TextualIntegerCoder.of()));
  * }</pre>
  *
  * <h3>Permissions</h3>
@@ -130,12 +128,6 @@ public class TextIO {
    * {@link #withCoder(Coder)} to change the return type.
    */
   public static class Read {
-    /**
-     * Returns a transform for reading text files that uses the given step name.
-     */
-    public static Bound<String> named(String name) {
-      return new Bound<>(DEFAULT_TEXT_CODER).named(name);
-    }
 
     /**
      * Returns a transform for reading text files that reads from the file(s)
@@ -228,16 +220,6 @@ public class TextIO {
 
       /**
        * Returns a new transform for reading from text files that's like this one but
-       * with the given step name.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> named(String name) {
-        return new Bound<>(name, filepattern, coder, validate, compressionType);
-      }
-
-      /**
-       * Returns a new transform for reading from text files that's like this one but
        * that reads from the file(s) with the given name or pattern. See {@link TextIO.Read#from}
        * for a description of filepatterns.
        *
@@ -387,12 +369,6 @@ public class TextIO {
    * element of the input collection encoded into its own line.
    */
   public static class Write {
-    /**
-     * Returns a transform for writing to text files with the given step name.
-     */
-    public static Bound<String> named(String name) {
-      return new Bound<>(DEFAULT_TEXT_CODER).named(name);
-    }
 
     /**
      * Returns a transform for writing to text files that writes to the file(s)
@@ -521,17 +497,6 @@ public class TextIO {
 
       /**
        * Returns a transform for writing to text files that's like this one but
-       * with the given step name.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> named(String name) {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
-            shardTemplate, validate);
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one but
        * that writes to the file(s) with the given filename prefix.
        *
        * <p>See {@link TextIO.Write#to(String) Write.to(String)} for more information.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc52a102/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
index c2c0685..432c5df 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
@@ -25,14 +25,12 @@
  * from existing storage:
  * <pre>{@code
  * PCollection<TableRow> inputData = pipeline.apply(
- *     BigQueryIO.Read.named("Read")
- *                    .from("clouddataflow-readonly:samples.weather_stations");
+ *     BigQueryIO.Read.from("clouddataflow-readonly:samples.weather_stations"));
  * }</pre>
  * and {@code Write} transforms that persist PCollections to external storage:
  * <pre> {@code
  * PCollection<Integer> numbers = ...;
- * numbers.apply(TextIO.Write.named("WriteNumbers")
- *                           .to("gs://my_bucket/path/to/numbers"));
+ * numbers.apply(TextIO.Write.to("gs://my_bucket/path/to/numbers"));
  * } </pre>
  */
 package org.apache.beam.sdk.io;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc52a102/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index dde5c05..bc122e2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -159,21 +159,6 @@ public class Window {
   }
 
   /**
-   * Creates a {@code Window} {@code PTransform} with the given name.
-   *
-   * <p>See the discussion of Naming in
-   * {@link org.apache.beam.sdk.transforms.ParDo} for more explanation.
-   *
-   * <p>The resulting {@code PTransform} is incomplete, and its input/output
-   * type is not yet bound.  Use {@link Window.Unbound#into} to specify the
-   * {@link WindowFn} to use, which will also bind the input/output type of this
-   * {@code PTransform}.
-   */
-  public static Unbound named(String name) {
-    return new Unbound().named(name);
-  }
-
-  /**
    * Creates a {@code Window} {@code PTransform} that uses the given
    * {@link WindowFn} to window the data.
    *
@@ -255,19 +240,6 @@ public class Window {
     }
 
     /**
-     * Returns a new {@code Window} transform that's like this
-     * transform but with the specified name.  Does not modify this
-     * transform.  The resulting transform is still incomplete.
-     *
-     * <p>See the discussion of Naming in
-     * {@link org.apache.beam.sdk.transforms.ParDo} for more
-     * explanation.
-     */
-    public Unbound named(String name) {
-      return new Unbound(name);
-    }
-
-    /**
      * Returns a new {@code Window} {@code PTransform} that's like this
      * transform but that will use the given {@link WindowFn}, and that has
      * its input and output types bound.  Does not modify this transform.  The
@@ -408,20 +380,6 @@ public class Window {
     }
 
     /**
-     * Returns a new {@code Window} {@code PTransform} that's like this
-     * {@code PTransform} but with the specified name.  Does not
-     * modify this {@code PTransform}.
-     *
-     * <p>See the discussion of Naming in
-     * {@link org.apache.beam.sdk.transforms.ParDo} for more
-     * explanation.
-     */
-    public Bound<T> named(String name) {
-      return new Bound<>(
-          name, windowFn, trigger, mode, allowedLateness, closingBehavior, outputTimeFn);
-    }
-
-    /**
      * Sets a non-default trigger for this {@code Window} {@code PTransform}.
      * Elements that are assigned to a specific window will be output when
      * the trigger fires.


[2/3] incubator-beam git commit: Remove many uses of .named methods

Posted by bc...@apache.org.
Remove many uses of .named methods

Specifically, remove uses of:
  - Window.named
  - AvroIO.named
  - PubSubIO.named
  - TextIO.named
  - BigQueryIO.named
  - Read.named


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/57195358
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/57195358
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/57195358

Branch: refs/heads/master
Commit: 57195358592548e6f7e05bc8e4e292b126a726c5
Parents: 4f580f5
Author: Ben Chambers <bc...@google.com>
Authored: Thu Jun 23 22:27:05 2016 -0700
Committer: Ben Chambers <bc...@google.com>
Committed: Sun Jun 26 10:06:35 2016 -0700

----------------------------------------------------------------------
 .../beam/examples/DebuggingWordCount.java       |   2 +-
 .../org/apache/beam/examples/WordCount.java     |   4 +-
 .../apache/beam/examples/complete/TfIdf.java    |   3 +-
 .../examples/complete/TopWikipediaSessions.java |   2 +-
 .../examples/cookbook/DatastoreWordCount.java   |   2 +-
 .../beam/examples/cookbook/DeDupExample.java    |   5 +-
 .../beam/examples/cookbook/TriggerExample.java  |   4 +-
 .../beam/examples/complete/game/GameStats.java  |  28 ++-
 .../examples/complete/game/HourlyTeamScore.java |   5 +-
 .../examples/complete/game/LeaderBoard.java     |   8 +-
 .../beam/runners/flink/examples/TFIDF.java      |   3 +-
 .../beam/runners/flink/examples/WordCount.java  |   4 +-
 .../flink/examples/streaming/AutoComplete.java  |   9 +-
 .../flink/examples/streaming/JoinExamples.java  |  13 +-
 .../KafkaWindowedWordCountExample.java          |   2 +-
 .../examples/streaming/WindowedWordCount.java   |   3 +-
 .../beam/runners/dataflow/DataflowRunner.java   |   2 +-
 .../DataflowPipelineTranslatorTest.java         |   6 +-
 .../runners/dataflow/DataflowRunnerTest.java    |  18 +-
 .../beam/runners/spark/SimpleWordCountTest.java |   3 +-
 .../java/org/apache/beam/sdk/io/BigQueryIO.java |   1 -
 .../beam/sdk/io/AvroIOGeneratedClassTest.java   | 192 +++++--------------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |   5 +-
 .../org/apache/beam/sdk/io/BigQueryIOTest.java  |  82 +++-----
 .../apache/beam/sdk/io/FileBasedSourceTest.java |   5 +-
 .../org/apache/beam/sdk/io/PubsubIOTest.java    |   4 -
 .../java/org/apache/beam/sdk/io/TextIOTest.java |  37 +---
 .../org/apache/beam/sdk/io/XmlSourceTest.java   |  19 +-
 .../beam/sdk/runners/TransformTreeTest.java     |   4 +-
 .../sdk/transforms/windowing/WindowTest.java    |   6 +-
 .../sdk/transforms/windowing/WindowingTest.java |   2 +-
 .../src/main/java/DebuggingWordCount.java       |   2 +-
 .../src/main/java/WordCount.java                |   4 +-
 33 files changed, 158 insertions(+), 331 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index 85823c2..8d85d44 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -173,7 +173,7 @@ public class DebuggingWordCount {
     Pipeline p = Pipeline.create(options);
 
     PCollection<KV<String, Long>> filteredWords =
-        p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
+        p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
          .apply(new WordCount.CountWords())
          .apply(ParDo.of(new FilterTextFn(options.getFilterPattern())));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index cf6c45a..af16c44 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -205,10 +205,10 @@ public class WordCount {
 
     // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
     // static FormatAsTextFn() to the ParDo transform.
-    p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
+    p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
      .apply(new CountWords())
      .apply(MapElements.via(new FormatAsTextFn()))
-     .apply(TextIO.Write.named("WriteCounts").to(options.getOutput()));
+     .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
 
     p.run();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 23653d6..8305314 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -187,8 +187,7 @@ public class TfIdf {
         }
 
         PCollection<KV<URI, String>> oneUriToLines = pipeline
-            .apply(TextIO.Read.from(uriString)
-                .named("TextIO.Read(" + uriString + ")"))
+            .apply("TextIO.Read(" + uriString + ")", TextIO.Read.from(uriString))
             .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri));
 
         urisToLines = urisToLines.and(oneUriToLines);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
index 5d95e3f..80b3ade 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -217,7 +217,7 @@ public class TopWikipediaSessions {
         .from(options.getInput())
         .withCoder(TableRowJsonCoder.of()))
      .apply(new ComputeTopSessions(samplingThreshold))
-     .apply(TextIO.Write.named("Write").withoutSharding().to(options.getOutput()));
+     .apply("Write", TextIO.Write.withoutSharding().to(options.getOutput()));
 
     p.run();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
index 7578d79..b070f94 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
@@ -193,7 +193,7 @@ public class DatastoreWordCount {
    */
   public static void writeDataToDatastore(Options options) {
       Pipeline p = Pipeline.create(options);
-      p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
+      p.apply("ReadLines", TextIO.Read.from(options.getInput()))
        .apply(ParDo.of(new CreateEntityFn(options.getNamespace(), options.getKind())))
        .apply(DatastoreIO.writeTo(options.getProject()));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
index db65543..d573bcd 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
@@ -89,10 +89,9 @@ public class DeDupExample {
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
     Pipeline p = Pipeline.create(options);
 
-    p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
+    p.apply("ReadLines", TextIO.Read.from(options.getInput()))
      .apply(RemoveDuplicates.<String>create())
-     .apply(TextIO.Write.named("DedupedShakespeare")
-         .to(options.getOutput()));
+     .apply("DedupedShakespeare", TextIO.Write.to(options.getOutput()));
 
     p.run();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index ab1fb66..ff4909b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -467,7 +467,7 @@ public class TriggerExample {
     TableReference tableRef = getTableReference(options.getProject(),
         options.getBigQueryDataset(), options.getBigQueryTable());
 
-    PCollectionList<TableRow> resultList = pipeline.apply(PubsubIO.Read.named("ReadPubsubInput")
+    PCollectionList<TableRow> resultList = pipeline.apply("ReadPubsubInput", PubsubIO.Read
         .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
         .topic(options.getPubsubTopic()))
         .apply(ParDo.of(new ExtractFlowInfo()))
@@ -493,7 +493,7 @@ public class TriggerExample {
     copiedOptions.setJobName(options.getJobName() + "-injector");
     Pipeline injectorPipeline = Pipeline.create(copiedOptions);
     injectorPipeline
-    .apply(TextIO.Read.named("ReadMyFile").from(options.getInput()))
+    .apply("ReadMyFile", TextIO.Read.from(options.getInput()))
     .apply("InsertRandomDelays", ParDo.of(new InsertDelays()))
     .apply(IntraBundleParallelization.of(PubsubFileInjector
         .withTimestampLabelKey(PUBSUB_TIMESTAMP_LABEL_KEY)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index ad8b49e..b1cb312 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -260,10 +260,8 @@ public class GameStats extends LeaderBoard {
     // Calculate the total score per user over fixed windows, and
     // cumulative updates for late data.
     final PCollectionView<Map<String, Integer>> spammersView = userEvents
-      .apply(Window.named("FixedWindowsUser")
-          .<KV<String, Integer>>into(FixedWindows.of(
-              Duration.standardMinutes(options.getFixedWindowDuration())))
-          )
+      .apply("FixedWindowsUser", Window.<KV<String, Integer>>into(
+          FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration()))))
 
       // Filter out everyone but those with (SCORE_WEIGHT * avg) clickrate.
       // These might be robots/spammers.
@@ -278,10 +276,8 @@ public class GameStats extends LeaderBoard {
     // suspected robots-- to filter out scores from those users from the sum.
     // Write the results to BigQuery.
     rawEvents
-      .apply(Window.named("WindowIntoFixedWindows")
-          .<GameActionInfo>into(FixedWindows.of(
-              Duration.standardMinutes(options.getFixedWindowDuration())))
-          )
+      .apply("WindowIntoFixedWindows", Window.<GameActionInfo>into(
+          FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration()))))
       // Filter out the detected spammer users, using the side input derived above.
       .apply("FilterOutSpammers", ParDo
               .withSideInputs(spammersView)
@@ -299,8 +295,8 @@ public class GameStats extends LeaderBoard {
       // [END DocInclude_FilterAndCalc]
       // Write the result to BigQuery
       .apply("WriteTeamSums",
-             new WriteWindowedToBigQuery<KV<String, Integer>>(
-                options.getTablePrefix() + "_team", configureWindowedWrite()));
+          new WriteWindowedToBigQuery<KV<String, Integer>>(
+              options.getTablePrefix() + "_team", configureWindowedWrite()));
 
 
     // [START DocInclude_SessionCalc]
@@ -309,10 +305,9 @@ public class GameStats extends LeaderBoard {
     // This information could help the game designers track the changing user engagement
     // as their set of games changes.
     userEvents
-      .apply(Window.named("WindowIntoSessions")
-            .<KV<String, Integer>>into(
-                  Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
-        .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
+      .apply("WindowIntoSessions", Window.<KV<String, Integer>>into(
+          Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
+          .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
       // For this use, we care only about the existence of the session, not any particular
       // information aggregated over it, so the following is an efficient way to do that.
       .apply(Combine.perKey(x -> 0))
@@ -321,9 +316,8 @@ public class GameStats extends LeaderBoard {
       // [END DocInclude_SessionCalc]
       // [START DocInclude_Rewindow]
       // Re-window to process groups of session sums according to when the sessions complete.
-      .apply(Window.named("WindowToExtractSessionMean")
-            .<Integer>into(
-                FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration()))))
+      .apply("WindowToExtractSessionMean", Window.<Integer>into(
+          FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration()))))
       // Find the mean session duration in each window.
       .apply(Mean.<Integer>globally().withoutDefaults())
       // Write this info to a BigQuery table.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
index 7a808ac..e489607 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
@@ -178,9 +178,8 @@ public class HourlyTeamScore extends UserScore {
       // Add an element timestamp based on the event log, and apply fixed windowing.
       .apply("AddEventTimestamps",
              WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp())))
-      .apply(Window.named("FixedWindowsTeam")
-          .<GameActionInfo>into(FixedWindows.of(
-                Duration.standardMinutes(options.getWindowDuration()))))
+      .apply("FixedWindowsTeam", Window.<GameActionInfo>into(
+          FixedWindows.of(Duration.standardMinutes(options.getWindowDuration()))))
       // [END DocInclude_HTSAddTsAndWindow]
 
       // Extract and sum teamname/score pairs from the event data.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 2c608aa..a14d533 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -190,9 +190,8 @@ public class LeaderBoard extends HourlyTeamScore {
     // [START DocInclude_WindowAndTrigger]
     // Extract team/score pairs from the event stream, using hour-long windows by default.
     gameEvents
-        .apply(Window.named("LeaderboardTeamFixedWindows")
-          .<GameActionInfo>into(FixedWindows.of(
-              Duration.standardMinutes(options.getTeamWindowDuration())))
+        .apply("LeaderboardTeamFixedWindows", Window.<GameActionInfo>into(
+            FixedWindows.of(Duration.standardMinutes(options.getTeamWindowDuration())))
           // We will get early (speculative) results as well as cumulative
           // processing of late data.
           .triggering(
@@ -215,8 +214,7 @@ public class LeaderBoard extends HourlyTeamScore {
     // Extract user/score pairs from the event stream using processing time, via global windowing.
     // Get periodic updates on all users' running scores.
     gameEvents
-        .apply(Window.named("LeaderboardUserGlobalWindow")
-          .<GameActionInfo>into(new GlobalWindows())
+        .apply("LeaderboardUserGlobalWindow", Window.<GameActionInfo>into(new GlobalWindows())
           // Get periodic results every ten minutes.
               .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
                   .plusDelayOf(TEN_MINUTES)))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index 084ac7c..56737a4 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -191,8 +191,7 @@ public class TFIDF {
         }
 
         PCollection<KV<URI, String>> oneUriToLines = pipeline
-            .apply(TextIO.Read.from(uriString)
-                .named("TextIO.Read(" + uriString + ")"))
+            .apply("TextIO.Read(" + uriString + ")", TextIO.Read.from(uriString))
             .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri));
 
         urisToLines = urisToLines.and(oneUriToLines);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
index 2817622..2d95c97 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
@@ -109,10 +109,10 @@ public class WordCount {
 
     Pipeline p = Pipeline.create(options);
 
-    p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
+    p.apply("ReadLines", TextIO.Read.from(options.getInput()))
         .apply(new CountWords())
         .apply(MapElements.via(new FormatAsTextFn()))
-        .apply(TextIO.Write.named("WriteCounts").to(options.getOutput()));
+        .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
 
     p.run();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index ed11781..c0ff85d 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -44,7 +44,6 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 
@@ -380,14 +379,14 @@ public class AutoComplete {
     options.setExecutionRetryDelay(3000L);
     options.setRunner(FlinkRunner.class);
 
-    PTransform<? super PBegin, PCollection<String>> readSource =
-            Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("WordStream");
-    WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
+
+    WindowFn<Object, ?> windowFn =
+        FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
 
     // Create the pipeline.
     Pipeline p = Pipeline.create(options);
     PCollection<KV<String, List<CompletionCandidate>>> toWrite = p
-      .apply(readSource)
+      .apply("WordStream", Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)))
       .apply(ParDo.of(new ExtractWordsFn()))
       .apply(Window.<String>into(windowFn)
               .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
index 0828ddc..f456b27 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.join.CoGbkResult;
 import org.apache.beam.sdk.transforms.join.CoGroupByKey;
@@ -34,7 +33,6 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TupleTag;
 
@@ -135,22 +133,19 @@ public class JoinExamples {
     options.setExecutionRetryDelay(3000L);
     options.setRunner(FlinkRunner.class);
 
-    PTransform<? super PBegin, PCollection<String>> readSourceA =
-        Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("FirstStream");
-    PTransform<? super PBegin, PCollection<String>> readSourceB =
-        Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3)).named("SecondStream");
-
     WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
 
     Pipeline p = Pipeline.create(options);
 
     // the following two 'applys' create multiple inputs to our pipeline, one for each
     // of our two input sources.
-    PCollection<String> streamA = p.apply(readSourceA)
+    PCollection<String> streamA = p
+        .apply("FirstStream", Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)))
         .apply(Window.<String>into(windowFn)
             .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
             .discardingFiredPanes());
-    PCollection<String> streamB = p.apply(readSourceB)
+    PCollection<String> streamB = p
+        .apply("SecondStream", Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3)))
         .apply(Window.<String>into(windowFn)
             .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
             .discardingFiredPanes());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
index b14c5ae..4e81420 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -132,7 +132,7 @@ public class KafkaWindowedWordCountExample {
         new SimpleStringSchema(), p);
 
     PCollection<String> words = pipeline
-        .apply(Read.named("StreamingWordCount").from(UnboundedFlinkSource.of(kafkaConsumer)))
+        .apply("StreamingWordCount", Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
         .apply(ParDo.of(new ExtractWordsFn()))
         .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize())))
             .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
index f72b705..1b532a7 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
@@ -119,7 +119,8 @@ public class WindowedWordCount {
     Pipeline pipeline = Pipeline.create(options);
 
     PCollection<String> words = pipeline
-        .apply(Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("StreamingWordCount"))
+        .apply("StreamingWordCount",
+            Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)))
         .apply(ParDo.of(new ExtractWordsFn()))
         .apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(options.getWindowSize()))
             .every(Duration.standardSeconds(options.getSlide())))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index d47d285..7ff247a 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -2686,7 +2686,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       try {
         Coder<T> coder = transform.getDefaultOutputCoder(input);
         return Pipeline.applyTransform(
-            input, PubsubIO.Read.named("StartingSignal").subscription("_starting_signal/"))
+            "StartingSignal", input, PubsubIO.Read.subscription("_starting_signal/"))
             .apply(ParDo.of(new OutputNullKv()))
             .apply("GlobalSingleton", Window.<KV<Void, Void>>into(new GlobalWindows())
                 .triggering(AfterPane.elementCountAtLeast(1))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index c3a6a11..e04a1fc 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -127,8 +127,8 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     options.setRunner(DataflowRunner.class);
     Pipeline p = Pipeline.create(options);
 
-    p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object"))
-        .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object"));
+    p.apply("ReadMyFile", TextIO.Read.from("gs://bucket/object"))
+     .apply("WriteMyFile", TextIO.Write.to("gs://bucket/object"));
 
     return p;
   }
@@ -458,7 +458,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
 
     // Create a pipeline that the predefined step will be embedded into
     Pipeline pipeline = Pipeline.create(options);
-    pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in"))
+    pipeline.apply("ReadMyFile", TextIO.Read.from("gs://bucket/in"))
         .apply(ParDo.of(new NoOpFn()))
         .apply(new EmbeddedTransform(predefinedStep.clone()))
         .apply(ParDo.of(new NoOpFn()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index e094d0d..999dc3a 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.dataflow;
 
 import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -151,8 +152,8 @@ public class DataflowRunnerTest {
     options.setRunner(DataflowRunner.class);
     Pipeline p = Pipeline.create(options);
 
-    p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object"))
-        .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object"));
+    p.apply("ReadMyFile", TextIO.Read.from("gs://bucket/object"))
+        .apply("WriteMyFile", TextIO.Write.to("gs://bucket/object"));
 
     return p;
   }
@@ -464,7 +465,7 @@ public class DataflowRunnerTest {
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
 
     Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor));
-    p.apply(TextIO.Read.named("ReadMyNonGcsFile").from(tmpFolder.newFile().getPath()));
+    p.apply("ReadMyNonGcsFile", TextIO.Read.from(tmpFolder.newFile().getPath()));
 
     thrown.expectCause(Matchers.allOf(
         instanceOf(IllegalArgumentException.class),
@@ -477,11 +478,11 @@ public class DataflowRunnerTest {
   @Test
   public void testNonGcsFilePathInWriteFailure() throws IOException {
     Pipeline p = buildDataflowPipeline(buildPipelineOptions());
-    PCollection<String> pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object"));
+    PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.Read.from("gs://bucket/object"));
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(containsString("expected a valid 'gs://' path but was given"));
-    pc.apply(TextIO.Write.named("WriteMyNonGcsFile").to("/tmp/file"));
+    pc.apply("WriteMyNonGcsFile", TextIO.Write.to("/tmp/file"));
   }
 
   @Test
@@ -489,8 +490,7 @@ public class DataflowRunnerTest {
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
 
     Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor));
-    p.apply(TextIO.Read.named("ReadInvalidGcsFile")
-        .from("gs://bucket/tmp//file"));
+    p.apply("ReadInvalidGcsFile", TextIO.Read.from("gs://bucket/tmp//file"));
 
     thrown.expectCause(Matchers.allOf(
         instanceOf(IllegalArgumentException.class),
@@ -502,11 +502,11 @@ public class DataflowRunnerTest {
   @Test
   public void testMultiSlashGcsFileWritePath() throws IOException {
     Pipeline p = buildDataflowPipeline(buildPipelineOptions());
-    PCollection<String> pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object"));
+    PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.Read.from("gs://bucket/object"));
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("consecutive slashes");
-    pc.apply(TextIO.Write.named("WriteInvalidGcsFile").to("gs://bucket/tmp//file"));
+    pc.apply("WriteInvalidGcsFile", TextIO.Write.to("gs://bucket/tmp//file"));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index da3fa7a..6a3edd7 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -89,8 +89,7 @@ public class SimpleWordCountTest {
     PCollection<String> output = inputWords.apply(new CountWords());
 
     File outputFile = testFolder.newFile();
-    output.apply(
-        TextIO.Write.named("WriteCounts").to(outputFile.getAbsolutePath()).withoutSharding());
+    output.apply("WriteCounts", TextIO.Write.to(outputFile.getAbsolutePath()).withoutSharding());
 
     EvaluationResult res = SparkRunner.create().run(p);
     res.close();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index 6a36c8d..7cac705 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -518,7 +518,6 @@ public class BigQueryIO {
               + " query without a result flattening preference");
         }
 
-        // Only verify existence/correctness if validation is enabled.
         if (validate) {
           // Check for source table/query presence for early failure notification.
           // Note that a presence check can fail if the table or dataset are created by earlier

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
index da886de..6e26d33 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
@@ -136,6 +136,18 @@ public class AvroIOGeneratedClassTest {
     return users;
   }
 
+  <T> void runTestRead(
+      String applyName, AvroIO.Read.Bound<T> read, String expectedName, T[] expectedOutput)
+      throws Exception {
+    generateAvroFile(generateAvroObjects());
+
+    TestPipeline p = TestPipeline.create();
+    PCollection<T> output = p.apply(applyName, read);
+    PAssert.that(output).containsInAnyOrder(expectedOutput);
+    p.run();
+    assertEquals(expectedName, output.getName());
+  }
+
   <T> void runTestRead(AvroIO.Read.Bound<T> read, String expectedName, T[] expectedOutput)
       throws Exception {
     generateAvroFile(generateAvroObjects());
@@ -158,28 +170,16 @@ public class AvroIOGeneratedClassTest {
         AvroIO.Read.withSchema(AvroGeneratedUser.class).from(avroFile.getPath()),
         "AvroIO.Read/Read.out",
         generateAvroObjects());
-    runTestRead(
-        AvroIO.Read.named("MyRead").from(avroFile.getPath()).withSchema(AvroGeneratedUser.class),
+    runTestRead("MyRead",
+        AvroIO.Read.from(avroFile.getPath()).withSchema(AvroGeneratedUser.class),
         "MyRead/Read.out",
         generateAvroObjects());
-    runTestRead(
-        AvroIO.Read.named("MyRead").withSchema(AvroGeneratedUser.class).from(avroFile.getPath()),
+    runTestRead("MyRead",
+        AvroIO.Read.withSchema(AvroGeneratedUser.class).from(avroFile.getPath()),
         "MyRead/Read.out",
         generateAvroObjects());
-    runTestRead(
-        AvroIO.Read.from(avroFile.getPath()).withSchema(AvroGeneratedUser.class).named("HerRead"),
-        "HerRead/Read.out",
-        generateAvroObjects());
-    runTestRead(
-        AvroIO.Read.from(avroFile.getPath()).named("HerRead").withSchema(AvroGeneratedUser.class),
-        "HerRead/Read.out",
-        generateAvroObjects());
-    runTestRead(
-        AvroIO.Read.withSchema(AvroGeneratedUser.class).named("HerRead").from(avroFile.getPath()),
-        "HerRead/Read.out",
-        generateAvroObjects());
-    runTestRead(
-        AvroIO.Read.withSchema(AvroGeneratedUser.class).from(avroFile.getPath()).named("HerRead"),
+    runTestRead("HerRead",
+        AvroIO.Read.from(avroFile.getPath()).withSchema(AvroGeneratedUser.class),
         "HerRead/Read.out",
         generateAvroObjects());
   }
@@ -195,28 +195,20 @@ public class AvroIOGeneratedClassTest {
         AvroIO.Read.withSchema(schema).from(avroFile.getPath()),
         "AvroIO.Read/Read.out",
         generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.named("MyRead").from(avroFile.getPath()).withSchema(schema),
+    runTestRead("MyRead",
+        AvroIO.Read.from(avroFile.getPath()).withSchema(schema),
         "MyRead/Read.out",
         generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.named("MyRead").withSchema(schema).from(avroFile.getPath()),
+    runTestRead("MyRead",
+        AvroIO.Read.withSchema(schema).from(avroFile.getPath()),
         "MyRead/Read.out",
         generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.from(avroFile.getPath()).withSchema(schema).named("HerRead"),
-        "HerRead/Read.out",
-        generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.from(avroFile.getPath()).named("HerRead").withSchema(schema),
-        "HerRead/Read.out",
-        generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.withSchema(schema).named("HerRead").from(avroFile.getPath()),
+    runTestRead("HerRead",
+        AvroIO.Read.from(avroFile.getPath()).withSchema(schema),
         "HerRead/Read.out",
         generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.withSchema(schema).from(avroFile.getPath()).named("HerRead"),
+    runTestRead("HerRead",
+        AvroIO.Read.from(avroFile.getPath()).withSchema(schema),
         "HerRead/Read.out",
         generateAvroGenericRecords());
   }
@@ -232,28 +224,12 @@ public class AvroIOGeneratedClassTest {
         AvroIO.Read.withSchema(schemaString).from(avroFile.getPath()),
         "AvroIO.Read/Read.out",
         generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.named("MyRead").from(avroFile.getPath()).withSchema(schemaString),
-        "MyRead/Read.out",
-        generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.named("MyRead").withSchema(schemaString).from(avroFile.getPath()),
+    runTestRead("MyRead",
+        AvroIO.Read.from(avroFile.getPath()).withSchema(schemaString),
         "MyRead/Read.out",
         generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.from(avroFile.getPath()).withSchema(schemaString).named("HerRead"),
-        "HerRead/Read.out",
-        generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.from(avroFile.getPath()).named("HerRead").withSchema(schemaString),
-        "HerRead/Read.out",
-        generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.withSchema(schemaString).named("HerRead").from(avroFile.getPath()),
-        "HerRead/Read.out",
-        generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.withSchema(schemaString).from(avroFile.getPath()).named("HerRead"),
+    runTestRead("HerRead",
+        AvroIO.Read.withSchema(schemaString).from(avroFile.getPath()),
         "HerRead/Read.out",
         generateAvroGenericRecords());
   }
@@ -276,106 +252,34 @@ public class AvroIOGeneratedClassTest {
   @Test
   @Category(NeedsRunner.class)
   public void testWriteFromGeneratedClass() throws Exception {
-    runTestWrite(AvroIO.Write.to(avroFile.getPath())
-                             .withSchema(AvroGeneratedUser.class),
-                 "AvroIO.Write");
-    runTestWrite(AvroIO.Write.withSchema(AvroGeneratedUser.class)
-                             .to(avroFile.getPath()),
-                 "AvroIO.Write");
-    runTestWrite(AvroIO.Write.named("MyWrite")
-                             .to(avroFile.getPath())
-                             .withSchema(AvroGeneratedUser.class),
-                 "MyWrite");
-    runTestWrite(AvroIO.Write.named("MyWrite")
-                             .withSchema(AvroGeneratedUser.class)
-                             .to(avroFile.getPath()),
-                 "MyWrite");
-    runTestWrite(AvroIO.Write.to(avroFile.getPath())
-                             .withSchema(AvroGeneratedUser.class)
-                             .named("HerWrite"),
-                 "HerWrite");
-    runTestWrite(AvroIO.Write.to(avroFile.getPath())
-                             .named("HerWrite")
-                             .withSchema(AvroGeneratedUser.class),
-                 "HerWrite");
-    runTestWrite(AvroIO.Write.withSchema(AvroGeneratedUser.class)
-                             .named("HerWrite")
-                             .to(avroFile.getPath()),
-                 "HerWrite");
-    runTestWrite(AvroIO.Write.withSchema(AvroGeneratedUser.class)
-                             .to(avroFile.getPath())
-                             .named("HerWrite"),
-                 "HerWrite");
+    runTestWrite(
+        AvroIO.Write.to(avroFile.getPath()).withSchema(AvroGeneratedUser.class),
+        "AvroIO.Write");
+    runTestWrite(
+        AvroIO.Write.withSchema(AvroGeneratedUser.class).to(avroFile.getPath()),
+        "AvroIO.Write");
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testWriteFromSchema() throws Exception {
-    runTestWrite(AvroIO.Write.to(avroFile.getPath())
-                             .withSchema(schema),
-                 "AvroIO.Write");
-    runTestWrite(AvroIO.Write.withSchema(schema)
-                             .to(avroFile.getPath()),
-                 "AvroIO.Write");
-    runTestWrite(AvroIO.Write.named("MyWrite")
-                             .to(avroFile.getPath())
-                             .withSchema(schema),
-                 "MyWrite");
-    runTestWrite(AvroIO.Write.named("MyWrite")
-                             .withSchema(schema)
-                             .to(avroFile.getPath()),
-                 "MyWrite");
-    runTestWrite(AvroIO.Write.to(avroFile.getPath())
-                             .withSchema(schema)
-                             .named("HerWrite"),
-                 "HerWrite");
-    runTestWrite(AvroIO.Write.to(avroFile.getPath())
-                             .named("HerWrite")
-                             .withSchema(schema),
-                 "HerWrite");
-    runTestWrite(AvroIO.Write.withSchema(schema)
-                             .named("HerWrite")
-                             .to(avroFile.getPath()),
-                 "HerWrite");
-    runTestWrite(AvroIO.Write.withSchema(schema)
-                             .to(avroFile.getPath())
-                             .named("HerWrite"),
-                 "HerWrite");
+    runTestWrite(
+        AvroIO.Write.to(avroFile.getPath()).withSchema(schema),
+        "AvroIO.Write");
+    runTestWrite(
+        AvroIO.Write.withSchema(schema).to(avroFile.getPath()),
+        "AvroIO.Write");
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testWriteFromSchemaString() throws Exception {
-    runTestWrite(AvroIO.Write.to(avroFile.getPath())
-                             .withSchema(schemaString),
-                 "AvroIO.Write");
-    runTestWrite(AvroIO.Write.withSchema(schemaString)
-                             .to(avroFile.getPath()),
-                 "AvroIO.Write");
-    runTestWrite(AvroIO.Write.named("MyWrite")
-                             .to(avroFile.getPath())
-                             .withSchema(schemaString),
-                 "MyWrite");
-    runTestWrite(AvroIO.Write.named("MyWrite")
-                             .withSchema(schemaString)
-                             .to(avroFile.getPath()),
-                 "MyWrite");
-    runTestWrite(AvroIO.Write.to(avroFile.getPath())
-                             .withSchema(schemaString)
-                             .named("HerWrite"),
-                 "HerWrite");
-    runTestWrite(AvroIO.Write.to(avroFile.getPath())
-                             .named("HerWrite")
-                             .withSchema(schemaString),
-                 "HerWrite");
-    runTestWrite(AvroIO.Write.withSchema(schemaString)
-                             .named("HerWrite")
-                             .to(avroFile.getPath()),
-                 "HerWrite");
-    runTestWrite(AvroIO.Write.withSchema(schemaString)
-                             .to(avroFile.getPath())
-                             .named("HerWrite"),
-                 "HerWrite");
+    runTestWrite(
+        AvroIO.Write.to(avroFile.getPath()).withSchema(schemaString),
+        "AvroIO.Write");
+    runTestWrite(
+        AvroIO.Write.withSchema(schemaString).to(avroFile.getPath()),
+        "AvroIO.Write");
   }
 
   // TODO: for Write only, test withSuffix, withNumShards,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 13c1bcf..8625b10 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -81,10 +82,6 @@ public class AvroIOTest {
   public void testAvroIOGetName() {
     assertEquals("AvroIO.Read", AvroIO.Read.from("gs://bucket/foo*/baz").getName());
     assertEquals("AvroIO.Write", AvroIO.Write.to("gs://bucket/foo/baz").getName());
-    assertEquals("ReadMyFile",
-        AvroIO.Read.named("ReadMyFile").from("gs://bucket/foo*/baz").getName());
-    assertEquals("WriteMyFile",
-        AvroIO.Write.named("WriteMyFile").to("gs://bucket/foo/baz").getName());
   }
 
   @DefaultCoder(AvroCoder.class)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
index a1daf72..f0d3fce 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
@@ -340,8 +340,7 @@ public class BigQueryIOTest implements Serializable {
     checkReadTableObjectWithValidate(bound, project, dataset, table, true);
   }
 
-  private void checkReadQueryObject(
-      BigQueryIO.Read.Bound bound, String query) {
+  private void checkReadQueryObject(BigQueryIO.Read.Bound bound, String query) {
     checkReadQueryObjectWithValidate(bound, query, true);
   }
 
@@ -393,15 +392,13 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildTableBasedSource() {
-    BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable")
-        .from("foo.com:project:somedataset.sometable");
+    BigQueryIO.Read.Bound bound = BigQueryIO.Read.from("foo.com:project:somedataset.sometable");
     checkReadTableObject(bound, "foo.com:project", "somedataset", "sometable");
   }
 
   @Test
   public void testBuildQueryBasedSource() {
-    BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyQuery")
-        .fromQuery("foo_query");
+    BigQueryIO.Read.Bound bound = BigQueryIO.Read.fromQuery("foo_query");
     checkReadQueryObject(bound, "foo_query");
   }
 
@@ -409,8 +406,8 @@ public class BigQueryIOTest implements Serializable {
   public void testBuildTableBasedSourceWithoutValidation() {
     // This test just checks that using withoutValidation will not trigger object
     // construction errors.
-    BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable")
-        .from("foo.com:project:somedataset.sometable").withoutValidation();
+    BigQueryIO.Read.Bound bound =
+        BigQueryIO.Read.from("foo.com:project:somedataset.sometable").withoutValidation();
     checkReadTableObjectWithValidate(bound, "foo.com:project", "somedataset", "sometable", false);
   }
 
@@ -418,15 +415,15 @@ public class BigQueryIOTest implements Serializable {
   public void testBuildQueryBasedSourceWithoutValidation() {
     // This test just checks that using withoutValidation will not trigger object
     // construction errors.
-    BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable")
-        .fromQuery("some_query").withoutValidation();
+    BigQueryIO.Read.Bound bound =
+        BigQueryIO.Read.fromQuery("some_query").withoutValidation();
     checkReadQueryObjectWithValidate(bound, "some_query", false);
   }
 
   @Test
   public void testBuildTableBasedSourceWithDefaultProject() {
-    BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable")
-        .from("somedataset.sometable");
+    BigQueryIO.Read.Bound bound =
+        BigQueryIO.Read.from("somedataset.sometable");
     checkReadTableObject(bound, null, "somedataset", "sometable");
   }
 
@@ -436,8 +433,7 @@ public class BigQueryIOTest implements Serializable {
         .setProjectId("foo.com:project")
         .setDatasetId("somedataset")
         .setTableId("sometable");
-    BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable")
-        .from(table);
+    BigQueryIO.Read.Bound bound = BigQueryIO.Read.from(table);
     checkReadTableObject(bound, "foo.com:project", "somedataset", "sometable");
   }
 
@@ -457,18 +453,7 @@ public class BigQueryIOTest implements Serializable {
     thrown.expectMessage(
         Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence"))
             .or(Matchers.containsString("BigQuery dataset not found for table")));
-    p.apply(BigQueryIO.Read.named("ReadMyTable").from(tableRef));
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testBuildSourceWithoutTableOrQuery() {
-    Pipeline p = TestPipeline.create();
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(
-        "Invalid BigQuery read operation, either table reference or query has to be set");
-    p.apply(BigQueryIO.Read.named("ReadMyTable"));
-    p.run();
+    p.apply(BigQueryIO.Read.from(tableRef));
   }
 
   @Test
@@ -490,8 +475,8 @@ public class BigQueryIOTest implements Serializable {
     thrown.expectMessage(
         "Invalid BigQuery read operation. Specifies both a query and a table, only one of these"
         + " should be provided");
-    p.apply(
-        BigQueryIO.Read.named("ReadMyTable")
+    p.apply("ReadMyTable",
+        BigQueryIO.Read
             .from("foo.com:project:somedataset.sometable")
             .fromQuery("query"));
     p.run();
@@ -505,8 +490,8 @@ public class BigQueryIOTest implements Serializable {
     thrown.expectMessage(
         "Invalid BigQuery read operation. Specifies a"
               + " table with a result flattening preference, which is not configurable");
-    p.apply(
-        BigQueryIO.Read.named("ReadMyTable")
+    p.apply("ReadMyTable",
+        BigQueryIO.Read
             .from("foo.com:project:somedataset.sometable")
             .withoutResultFlattening());
     p.run();
@@ -521,7 +506,7 @@ public class BigQueryIOTest implements Serializable {
         "Invalid BigQuery read operation. Specifies a"
               + " table with a result flattening preference, which is not configurable");
     p.apply(
-        BigQueryIO.Read.named("ReadMyTable")
+        BigQueryIO.Read
             .from("foo.com:project:somedataset.sometable")
             .withoutValidation()
             .withoutResultFlattening());
@@ -644,8 +629,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildSink() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
-        .to("foo.com:project:somedataset.sometable");
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("foo.com:project:somedataset.sometable");
     checkWriteObject(
         bound, "foo.com:project", "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
@@ -655,8 +639,8 @@ public class BigQueryIOTest implements Serializable {
   public void testBuildSinkwithoutValidation() {
     // This test just checks that using withoutValidation will not trigger object
     // construction errors.
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
-        .to("foo.com:project:somedataset.sometable").withoutValidation();
+    BigQueryIO.Write.Bound bound =
+        BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withoutValidation();
     checkWriteObjectWithValidate(
         bound, "foo.com:project", "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, false);
@@ -664,8 +648,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildSinkDefaultProject() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
-        .to("somedataset.sometable");
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("somedataset.sometable");
     checkWriteObject(
         bound, null, "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
@@ -677,8 +660,7 @@ public class BigQueryIOTest implements Serializable {
         .setProjectId("foo.com:project")
         .setDatasetId("somedataset")
         .setTableId("sometable");
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
-        .to(table);
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write.to(table);
     checkWriteObject(
         bound, "foo.com:project", "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
@@ -691,14 +673,14 @@ public class BigQueryIOTest implements Serializable {
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("must set the table reference");
     p.apply(Create.<TableRow>of().withCoder(TableRowJsonCoder.of()))
-        .apply(BigQueryIO.Write.named("WriteMyTable"));
+        .apply(BigQueryIO.Write.withoutValidation());
   }
 
   @Test
   public void testBuildSinkWithSchema() {
     TableSchema schema = new TableSchema();
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
-        .to("foo.com:project:somedataset.sometable").withSchema(schema);
+    BigQueryIO.Write.Bound bound =
+        BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withSchema(schema);
     checkWriteObject(
         bound, "foo.com:project", "somedataset", "sometable",
         schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
@@ -706,7 +688,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildSinkWithCreateDispositionNever() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write
         .to("foo.com:project:somedataset.sometable")
         .withCreateDisposition(CreateDisposition.CREATE_NEVER);
     checkWriteObject(
@@ -716,7 +698,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildSinkWithCreateDispositionIfNeeded() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write
         .to("foo.com:project:somedataset.sometable")
         .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED);
     checkWriteObject(
@@ -726,7 +708,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildSinkWithWriteDispositionTruncate() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write
         .to("foo.com:project:somedataset.sometable")
         .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE);
     checkWriteObject(
@@ -736,7 +718,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildSinkWithWriteDispositionAppend() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write
         .to("foo.com:project:somedataset.sometable")
         .withWriteDisposition(WriteDisposition.WRITE_APPEND);
     checkWriteObject(
@@ -746,7 +728,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildSinkWithWriteDispositionEmpty() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write
         .to("foo.com:project:somedataset.sometable")
         .withWriteDisposition(WriteDisposition.WRITE_EMPTY);
     checkWriteObject(
@@ -794,7 +776,7 @@ public class BigQueryIOTest implements Serializable {
         Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence"))
             .or(Matchers.containsString("BigQuery dataset not found for table")));
     p.apply(Create.<TableRow>of().withCoder(TableRowJsonCoder.of()))
-     .apply(BigQueryIO.Write.named("WriteMyTable")
+     .apply(BigQueryIO.Write
          .to(tableRef)
          .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
          .withSchema(new TableSchema()));
@@ -878,8 +860,6 @@ public class BigQueryIOTest implements Serializable {
   public void testBigQueryIOGetName() {
     assertEquals("BigQueryIO.Read", BigQueryIO.Read.from("somedataset.sometable").getName());
     assertEquals("BigQueryIO.Write", BigQueryIO.Write.to("somedataset.sometable").getName());
-    assertEquals("ReadMyTable", BigQueryIO.Read.named("ReadMyTable").getName());
-    assertEquals("WriteMyTable", BigQueryIO.Write.named("WriteMyTable").getName());
   }
 
   @Test
@@ -915,7 +895,7 @@ public class BigQueryIOTest implements Serializable {
     thrown.expectMessage("must set the table reference of a BigQueryIO.Write transform");
     TestPipeline.create()
         .apply(Create.<TableRow>of())
-        .apply(BigQueryIO.Write.named("name"));
+        .apply("name", BigQueryIO.Write.withoutValidation());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index b0c577d..c9f4079 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -21,6 +21,7 @@ import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionE
 import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
 import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent;
 import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
+
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -717,7 +718,7 @@ public class FileBasedSourceTest {
     File file = createFileWithData(fileName, data);
 
     TestFileBasedSource source = new TestFileBasedSource(file.getPath(), 64, null);
-    PCollection<String> output = p.apply(Read.from(source).named("ReadFileData"));
+    PCollection<String> output = p.apply("ReadFileData", Read.from(source));
 
     PAssert.that(output).containsInAnyOrder(data);
     p.run();
@@ -743,7 +744,7 @@ public class FileBasedSourceTest {
     TestFileBasedSource source =
         new TestFileBasedSource(new File(file1.getParent(), "file*").getPath(), 64, null);
 
-    PCollection<String> output = p.apply(Read.from(source).named("ReadFileData"));
+    PCollection<String> output = p.apply("ReadFileData", Read.from(source));
 
     List<String> expectedResults = new ArrayList<String>();
     expectedResults.addAll(data1);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
index eaf452d..efa1cd2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
@@ -45,10 +45,6 @@ public class PubsubIOTest {
         PubsubIO.Read.topic("projects/myproject/topics/mytopic").getName());
     assertEquals("PubsubIO.Write",
         PubsubIO.Write.topic("projects/myproject/topics/mytopic").getName());
-    assertEquals("ReadMyTopic",
-        PubsubIO.Read.named("ReadMyTopic").topic("projects/myproject/topics/mytopic").getName());
-    assertEquals("WriteMyTopic",
-        PubsubIO.Write.named("WriteMyTopic").topic("projects/myproject/topics/mytopic").getName());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index c3a5084..df598c8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -22,6 +22,7 @@ import static org.apache.beam.sdk.TestUtils.LINES_ARRAY;
 import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY;
 import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -44,14 +45,12 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
 
 import com.google.common.collect.ImmutableList;
 
@@ -167,16 +166,9 @@ public class TextIOTest {
     }
 
     {
-      PCollection<String> output2 =
-          p.apply(TextIO.Read.named("MyRead").from(file));
+      PCollection<String> output2 = p.apply("MyRead", TextIO.Read.from(file));
       assertEquals("MyRead/Read.out", output2.getName());
     }
-
-    {
-      PCollection<String> output3 =
-          p.apply(TextIO.Read.from(file).named("HerRead"));
-      assertEquals("HerRead/Read.out", output3.getName());
-    }
   }
 
   @Test
@@ -299,27 +291,6 @@ public class TextIOTest {
   }
 
   @Test
-  public void testWriteNamed() {
-    {
-      PTransform<PCollection<String>, PDone> transform1 =
-        TextIO.Write.to("/tmp/file.txt");
-      assertEquals("TextIO.Write", transform1.getName());
-    }
-
-    {
-      PTransform<PCollection<String>, PDone> transform2 =
-          TextIO.Write.named("MyWrite").to("/tmp/file.txt");
-      assertEquals("MyWrite", transform2.getName());
-    }
-
-    {
-      PTransform<PCollection<String>, PDone> transform3 =
-          TextIO.Write.to("/tmp/file.txt").named("HerWrite");
-      assertEquals("HerWrite", transform3.getName());
-    }
-  }
-
-  @Test
   @Category(NeedsRunner.class)
   public void testShardedWrite() throws Exception {
     runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), 5);
@@ -620,12 +591,8 @@ public class TextIOTest {
   public void testTextIOGetName() {
     assertEquals("TextIO.Read", TextIO.Read.from("somefile").getName());
     assertEquals("TextIO.Write", TextIO.Write.to("somefile").getName());
-    assertEquals("ReadMyFile", TextIO.Read.named("ReadMyFile").from("somefile").getName());
-    assertEquals("WriteMyFile", TextIO.Write.named("WriteMyFile").to("somefile").getName());
 
     assertEquals("TextIO.Read", TextIO.Read.from("somefile").toString());
-    assertEquals(
-        "ReadMyFile [TextIO.Read]", TextIO.Read.named("ReadMyFile").from("somefile").toString());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
index eb65468..37e3881 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
@@ -21,6 +21,7 @@ import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionE
 import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
 import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
 import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
@@ -582,7 +583,7 @@ public class XmlSourceTest {
             .withRecordClass(Train.class)
             .withMinBundleSize(1024);
 
-    PCollection<Train> output = p.apply(Read.from(source).named("ReadFileData"));
+    PCollection<Train> output = p.apply("ReadFileData", Read.from(source));
 
     List<Train> expectedResults =
         ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null),
@@ -672,7 +673,7 @@ public class XmlSourceTest {
             .withRecordElement("train")
             .withRecordClass(Train.class)
             .withMinBundleSize(1024);
-    PCollection<Train> output = p.apply(Read.from(source).named("ReadFileData"));
+    PCollection<Train> output = p.apply("ReadFileData", Read.from(source));
 
     PAssert.that(output).containsInAnyOrder(trains);
     p.run();
@@ -814,13 +815,13 @@ public class XmlSourceTest {
 
     Pipeline p = TestPipeline.create();
 
-    XmlSource<Train> source = XmlSource.<Train>from(file.getParent() + "/"
-        + "temp*.xml")
-                                  .withRootElement("trains")
-                                  .withRecordElement("train")
-                                  .withRecordClass(Train.class)
-                                  .withMinBundleSize(1024);
-    PCollection<Train> output = p.apply(Read.from(source).named("ReadFileData"));
+    XmlSource<Train> source =
+        XmlSource.<Train>from(file.getParent() + "/" + "temp*.xml")
+            .withRootElement("trains")
+            .withRecordElement("train")
+            .withRecordClass(Train.class)
+            .withMinBundleSize(1024);
+    PCollection<Train> output = p.apply("ReadFileData", Read.from(source));
 
     List<Train> expectedResults = new ArrayList<>();
     expectedResults.addAll(trains1);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index 0c992c4..08c3996 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -120,9 +120,9 @@ public class TransformTreeTest {
 
     Pipeline p = TestPipeline.create();
 
-    p.apply(TextIO.Read.named("ReadMyFile").from(inputFile.getPath()))
+    p.apply("ReadMyFile", TextIO.Read.from(inputFile.getPath()))
         .apply(Sample.<String>any(10))
-        .apply(TextIO.Write.named("WriteMyFile").to(outputFile.getPath()));
+        .apply("WriteMyFile", TextIO.Write.to(outputFile.getPath()));
 
     final EnumSet<TransformsSeen> visited =
         EnumSet.noneOf(TransformsSeen.class);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index c858f32..76bc038 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -119,11 +119,11 @@ public class WindowTest implements Serializable {
     FixedWindows fixed25 = FixedWindows.of(Duration.standardMinutes(25));
     WindowingStrategy<?, ?> strategy = TestPipeline.create()
         .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
-        .apply(Window.named("WindowInto10").<String>into(fixed10)
+        .apply("WindowInto10", Window.<String>into(fixed10)
             .withAllowedLateness(Duration.standardDays(1))
             .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(5)))
             .accumulatingFiredPanes())
-        .apply(Window.named("WindowInto25").<String>into(fixed25))
+        .apply("WindowInto25", Window.<String>into(fixed25))
         .getWindowingStrategy();
 
     assertEquals(Duration.standardDays(1), strategy.getAllowedLateness());
@@ -272,7 +272,7 @@ public class WindowTest implements Serializable {
 
   @Test
   public void testDisplayDataExcludesUnspecifiedProperties() {
-    Window.Bound<?> onlyHasAccumulationMode = Window.named("foobar").discardingFiredPanes();
+    Window.Bound<?> onlyHasAccumulationMode = Window.discardingFiredPanes();
     assertThat(DisplayData.from(onlyHasAccumulationMode), not(hasDisplayItem(hasKey(isOneOf(
         "windowFn",
         "trigger",

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index 21f58df..c1e092a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -221,7 +221,7 @@ public class WindowingTest implements Serializable {
 
     Pipeline p = TestPipeline.create();
     PCollection<String> output = p.begin()
-        .apply(TextIO.Read.named("ReadLines").from(filename))
+        .apply("ReadLines", TextIO.Read.from(filename))
         .apply(ParDo.of(new ExtractWordsWithTimestampsFn()))
         .apply(new WindowedCount(FixedWindows.of(Duration.millis(10))));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
index 3306cb4..c0e5b17 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
@@ -174,7 +174,7 @@ public class DebuggingWordCount {
     Pipeline p = Pipeline.create(options);
 
     PCollection<KV<String, Long>> filteredWords =
-        p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
+        p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
          .apply(new WordCount.CountWords())
          .apply(ParDo.of(new FilterTextFn(options.getFilterPattern())));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
index 07ed6d0..803e800 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
@@ -195,10 +195,10 @@ public class WordCount {
 
     // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
     // static FormatAsTextFn() to the ParDo transform.
-    p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
+    p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
      .apply(new CountWords())
      .apply(ParDo.of(new FormatAsTextFn()))
-     .apply(TextIO.Write.named("WriteCounts").to(options.getOutput()));
+     .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
 
     p.run();
   }


[3/3] incubator-beam git commit: This closes #529

Posted by bc...@apache.org.
This closes #529


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9abd0926
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9abd0926
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9abd0926

Branch: refs/heads/master
Commit: 9abd0926ac471c178eb30eeb01c964f9ecbd9cce
Parents: 4f580f5 fc52a10
Author: bchambers <bc...@google.com>
Authored: Mon Jun 27 10:29:31 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Mon Jun 27 10:29:31 2016 -0700

----------------------------------------------------------------------
 .../beam/examples/DebuggingWordCount.java       |   2 +-
 .../org/apache/beam/examples/WordCount.java     |   4 +-
 .../apache/beam/examples/complete/TfIdf.java    |   3 +-
 .../examples/complete/TopWikipediaSessions.java |   2 +-
 .../examples/cookbook/DatastoreWordCount.java   |   2 +-
 .../beam/examples/cookbook/DeDupExample.java    |   5 +-
 .../beam/examples/cookbook/TriggerExample.java  |   4 +-
 .../beam/examples/complete/game/GameStats.java  |  28 ++-
 .../examples/complete/game/HourlyTeamScore.java |   5 +-
 .../examples/complete/game/LeaderBoard.java     |   8 +-
 .../beam/runners/flink/examples/TFIDF.java      |   3 +-
 .../beam/runners/flink/examples/WordCount.java  |   4 +-
 .../flink/examples/streaming/AutoComplete.java  |   9 +-
 .../flink/examples/streaming/JoinExamples.java  |  13 +-
 .../KafkaWindowedWordCountExample.java          |   2 +-
 .../examples/streaming/WindowedWordCount.java   |   3 +-
 .../beam/runners/dataflow/DataflowRunner.java   |   2 +-
 .../DataflowPipelineTranslatorTest.java         |   6 +-
 .../runners/dataflow/DataflowRunnerTest.java    |  18 +-
 .../beam/runners/spark/SimpleWordCountTest.java |   3 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     |  53 +----
 .../java/org/apache/beam/sdk/io/BigQueryIO.java |  43 +----
 .../java/org/apache/beam/sdk/io/PubsubIO.java   |  35 +---
 .../main/java/org/apache/beam/sdk/io/Read.java  |  29 +--
 .../java/org/apache/beam/sdk/io/TextIO.java     |  55 +-----
 .../org/apache/beam/sdk/io/package-info.java    |   6 +-
 .../beam/sdk/transforms/windowing/Window.java   |  42 ----
 .../beam/sdk/io/AvroIOGeneratedClassTest.java   | 192 +++++--------------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |   5 +-
 .../org/apache/beam/sdk/io/BigQueryIOTest.java  |  82 +++-----
 .../apache/beam/sdk/io/FileBasedSourceTest.java |   5 +-
 .../org/apache/beam/sdk/io/PubsubIOTest.java    |   4 -
 .../java/org/apache/beam/sdk/io/TextIOTest.java |  37 +---
 .../org/apache/beam/sdk/io/XmlSourceTest.java   |  19 +-
 .../beam/sdk/runners/TransformTreeTest.java     |   4 +-
 .../sdk/transforms/windowing/WindowTest.java    |   6 +-
 .../sdk/transforms/windowing/WindowingTest.java |   2 +-
 .../src/main/java/DebuggingWordCount.java       |   2 +-
 .../src/main/java/WordCount.java                |   4 +-
 39 files changed, 183 insertions(+), 568 deletions(-)
----------------------------------------------------------------------