You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/06/27 17:31:07 UTC
[1/3] incubator-beam git commit: Remove many definitions of named
methods
Repository: incubator-beam
Updated Branches:
refs/heads/master 4f580f5f1 -> 9abd0926a
Remove many definitions of named methods
Specifically, remove the occurrences in:
- Window
- AvroIO
- PubsubIO
- TextIO
- BigQueryIO
- Read
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fc52a102
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fc52a102
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fc52a102
Branch: refs/heads/master
Commit: fc52a10259cd045f4b55ec59b2ae87c02c926ed4
Parents: 5719535
Author: Ben Chambers <bc...@google.com>
Authored: Thu Jun 23 17:55:24 2016 -0700
Committer: Ben Chambers <bc...@google.com>
Committed: Sun Jun 26 10:06:35 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 53 ++++---------------
.../java/org/apache/beam/sdk/io/BigQueryIO.java | 42 +--------------
.../java/org/apache/beam/sdk/io/PubsubIO.java | 35 +------------
.../main/java/org/apache/beam/sdk/io/Read.java | 29 +----------
.../java/org/apache/beam/sdk/io/TextIO.java | 55 ++++----------------
.../org/apache/beam/sdk/io/package-info.java | 6 +--
.../beam/sdk/transforms/windowing/Window.java | 42 ---------------
7 files changed, 25 insertions(+), 237 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc52a102/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 4b40c01..604051b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -55,9 +55,7 @@ import javax.annotation.Nullable;
* {@link AvroIO.Read}, specifying {@link AvroIO.Read#from} to specify
* the path of the file(s) to read from (e.g., a local filename or
* filename pattern if running locally, or a Google Cloud Storage
- * filename or filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}), and optionally
- * {@link AvroIO.Read#named} to specify the name of the pipeline step.
+ * filename or filename pattern of the form {@code "gs://<bucket>/<filepath>"}).
*
* <p>It is required to specify {@link AvroIO.Read#withSchema}. To
* read specific records, such as Avro-generated classes, provide an
@@ -73,15 +71,15 @@ import javax.annotation.Nullable;
* // A simple Read of a local file (only runs locally):
* PCollection<AvroAutoGenClass> records =
* p.apply(AvroIO.Read.from("/path/to/file.avro")
- * .withSchema(AvroAutoGenClass.class));
+ * .withSchema(AvroAutoGenClass.class));
*
* // A Read from a GCS file (runs locally and via the Google Cloud
* // Dataflow service):
* Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
* PCollection<GenericRecord> records =
- * p.apply(AvroIO.Read.named("ReadFromAvro")
- * .from("gs://my_bucket/path/to/records-*.avro")
- * .withSchema(schema));
+ * p.apply(AvroIO.Read
+ * .from("gs://my_bucket/path/to/records-*.avro")
+ * .withSchema(schema));
* } </pre>
*
* <p>To write a {@link PCollection} to one or more Avro files, use
@@ -110,10 +108,10 @@ import javax.annotation.Nullable;
* // Dataflow service):
* Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
* PCollection<GenericRecord> records = ...;
- * records.apply(AvroIO.Write.named("WriteToAvro")
- * .to("gs://my_bucket/path/to/numbers")
- * .withSchema(schema)
- * .withSuffix(".avro"));
+ * records.apply("WriteToAvro", AvroIO.Write
+ * .to("gs://my_bucket/path/to/numbers")
+ * .withSchema(schema)
+ * .withSuffix(".avro"));
* } </pre>
*
* <p><h3>Permissions</h3>
@@ -128,12 +126,6 @@ public class AvroIO {
* the decoding of each record.
*/
public static class Read {
- /**
- * Returns a {@link PTransform} with the given step name.
- */
- public static Bound<GenericRecord> named(String name) {
- return new Bound<>(GenericRecord.class).named(name);
- }
/**
* Returns a {@link PTransform} that reads from the file(s)
@@ -223,16 +215,6 @@ public class AvroIO {
/**
* Returns a new {@link PTransform} that's like this one but
- * with the given step name.
- *
- * <p>Does not modify this object.
- */
- public Bound<T> named(String name) {
- return new Bound<>(name, filepattern, type, schema, validate);
- }
-
- /**
- * Returns a new {@link PTransform} that's like this one but
* that reads from the file(s) with the given name or pattern.
* (See {@link AvroIO.Read#from} for a description of
* filepatterns.)
@@ -366,12 +348,6 @@ public class AvroIO {
* multiple Avro files matching a sharding pattern).
*/
public static class Write {
- /**
- * Returns a {@link PTransform} with the given step name.
- */
- public static Bound<GenericRecord> named(String name) {
- return new Bound<>(GenericRecord.class).named(name);
- }
/**
* Returns a {@link PTransform} that writes to the file(s)
@@ -522,17 +498,6 @@ public class AvroIO {
/**
* Returns a new {@link PTransform} that's like this one but
- * with the given step name.
- *
- * <p>Does not modify this object.
- */
- public Bound<T> named(String name) {
- return new Bound<>(
- name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, validate);
- }
-
- /**
- * Returns a new {@link PTransform} that's like this one but
* that writes to the file(s) with the given filename prefix.
*
* <p>See {@link AvroIO.Write#to(String)} for more information
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc52a102/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index 7cac705..a9d85b8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -162,8 +162,7 @@ import javax.annotation.Nullable;
* This produces a {@link PCollection} of {@link TableRow TableRows} as output:
* <pre>{@code
* PCollection<TableRow> shakespeare = pipeline.apply(
- * BigQueryIO.Read.named("Read")
- * .from("clouddataflow-readonly:samples.weather_stations"));
+ * BigQueryIO.Read.from("clouddataflow-readonly:samples.weather_stations"));
* }</pre>
*
* <p>See {@link TableRow} for more information on the {@link TableRow} object.
@@ -174,8 +173,7 @@ import javax.annotation.Nullable;
*
* <pre>{@code
* PCollection<TableRow> shakespeare = pipeline.apply(
- * BigQueryIO.Read.named("Read")
- * .fromQuery("SELECT year, mean_temp FROM samples.weather_stations"));
+ * BigQueryIO.Read.fromQuery("SELECT year, mean_temp FROM samples.weather_stations"));
* }</pre>
*
* <p>When creating a BigQuery input transform, users should provide either a query or a table.
@@ -193,7 +191,6 @@ import javax.annotation.Nullable;
* TableSchema schema = new TableSchema().setFields(fields);
*
* quotes.apply(BigQueryIO.Write
- * .named("Write")
* .to("my-project:output.output_table")
* .withSchema(schema)
* .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
@@ -214,7 +211,6 @@ import javax.annotation.Nullable;
* PCollection<TableRow> quotes = ...
* quotes.apply(Window.<TableRow>into(CalendarWindows.days(1)))
* .apply(BigQueryIO.Write
- * .named("Write")
* .withSchema(schema)
* .to(new SerializableFunction<BoundedWindow, String>() {
* public String apply(BoundedWindow window) {
@@ -345,13 +341,6 @@ public class BigQueryIO {
* }}</pre>
*/
public static class Read {
- /**
- * Returns a {@link Read.Bound} with the given name. The BigQuery table or query to be read
- * from has not yet been configured.
- */
- public static Bound named(String name) {
- return new Bound().named(name);
- }
/**
* Reads a BigQuery table specified as {@code "[project_id]:[dataset_id].[table_id]"} or
@@ -429,15 +418,6 @@ public class BigQueryIO {
}
/**
- * Returns a copy of this transform using the name associated with this transformation.
- *
- * <p>Does not modify this object.
- */
- public Bound named(String name) {
- return new Bound(name, query, jsonTableRef, validate, flattenResults, testBigQueryServices);
- }
-
- /**
* Returns a copy of this transform that reads from the specified table. Refer to
* {@link #parseTableSpec(String)} for the specification format.
*
@@ -1372,14 +1352,6 @@ public class BigQueryIO {
}
/**
- * Creates a write transformation with the given transform name. The BigQuery table to be
- * written has not yet been configured.
- */
- public static Bound named(String name) {
- return new Bound().named(name);
- }
-
- /**
* Creates a write transformation for the given table specification.
*
* <p>Refer to {@link #parseTableSpec(String)} for the specification format.
@@ -1522,16 +1494,6 @@ public class BigQueryIO {
}
/**
- * Returns a copy of this write transformation, but with the specified transform name.
- *
- * <p>Does not modify this object.
- */
- public Bound named(String name) {
- return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, validate, testBigQueryServices);
- }
-
- /**
* Returns a copy of this write transformation, but writing to the specified table. Refer to
* {@link #parseTableSpec(String)} for the specification format.
*
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc52a102/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index c6de8b4..ecb1f0a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -55,6 +55,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+
import javax.annotation.Nullable;
/**
@@ -369,13 +370,6 @@ public class PubsubIO {
* {@link Bound#maxNumRecords(int)} or {@link Bound#maxReadTime(Duration)} must be set.
*/
public static class Read {
- /**
- * Creates and returns a transform for reading from Cloud Pub/Sub with the specified transform
- * name.
- */
- public static Bound<String> named(String name) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).named(name);
- }
/**
* Creates and returns a transform for reading from a Cloud Pub/Sub topic. Mutually exclusive
@@ -531,16 +525,6 @@ public class PubsubIO {
}
/**
- * Returns a transform that's like this one but with the given step name.
- *
- * <p>Does not modify this object.
- */
- public Bound<T> named(String name) {
- return new Bound<>(
- name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
- }
-
- /**
* Returns a transform that's like this one but reading from the
* given subscription.
*
@@ -834,13 +818,6 @@ public class PubsubIO {
// TODO: Support non-String encodings.
public static class Write {
/**
- * Creates a transform that writes to Pub/Sub with the given step name.
- */
- public static Bound<String> named(String name) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).named(name);
- }
-
- /**
* Creates a transform that publishes to the specified topic.
*
* <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
@@ -917,16 +894,6 @@ public class PubsubIO {
}
/**
- * Returns a new transform that's like this one but with the specified step
- * name.
- *
- * <p>Does not modify this object.
- */
- public Bound<T> named(String name) {
- return new Bound<>(name, topic, timestampLabel, idLabel, coder);
- }
-
- /**
* Returns a new transform that's like this one but that writes to the specified
* topic.
*
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc52a102/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index c0440f2..e13ff06 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -38,17 +38,10 @@ import javax.annotation.Nullable;
* <p>Usage example:
* <pre>
* Pipeline p = Pipeline.create();
- * p.apply(Read.from(new MySource().withFoo("foo").withBar("bar"))
- * .named("foobar"));
+ * p.apply(Read.from(new MySource().withFoo("foo").withBar("bar")));
* </pre>
*/
public class Read {
- /**
- * Returns a new {@code Read} {@code PTransform} builder with the given name.
- */
- public static Builder named(String name) {
- return new Builder(name);
- }
/**
* Returns a new {@code Read.Bounded} {@code PTransform} reading from the given
@@ -104,16 +97,6 @@ public class Read {
this.source = SerializableUtils.ensureSerializable(source);
}
- /**
- * Returns a new {@code Bounded} {@code PTransform} that's like this one but
- * has the given name.
- *
- * <p>Does not modify this object.
- */
- public Bounded<T> named(String name) {
- return new Bounded<T>(name, source);
- }
-
@Override
protected Coder<T> getDefaultOutputCoder() {
return source.getDefaultOutputCoder();
@@ -162,16 +145,6 @@ public class Read {
}
/**
- * Returns a new {@code Unbounded} {@code PTransform} that's like this one but
- * has the given name.
- *
- * <p>Does not modify this object.
- */
- public Unbounded<T> named(String name) {
- return new Unbounded<T>(name, source);
- }
-
- /**
* Returns a new {@link BoundedReadFromUnboundedSource} that reads a bounded amount
* of data from the given {@link UnboundedSource}. The bound is specified as a number
* of records to read.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc52a102/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index a7e5e29..7e7a3e6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -58,8 +58,7 @@ import javax.annotation.Nullable;
* the path of the file(s) to read from (e.g., a local filename or
* filename pattern if running locally, or a Google Cloud Storage
* filename or filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}). You may optionally call
- * {@link TextIO.Read#named(String)} to specify the name of the pipeline step.
+ * {@code "gs://<bucket>/<filepath>"}).
*
* <p>By default, {@link TextIO.Read} returns a {@link PCollection} of {@link String Strings},
* each corresponding to one line of an input UTF-8 text file. To convert directly from the raw
@@ -78,9 +77,9 @@ import javax.annotation.Nullable;
* // A fully-specified Read from a GCS file (runs locally and via the
* // Google Cloud Dataflow service):
* PCollection<Integer> numbers =
- * p.apply(TextIO.Read.named("ReadNumbers")
- * .from("gs://my_bucket/path/to/numbers-*.txt")
- * .withCoder(TextualIntegerCoder.of()));
+ * p.apply("ReadNumbers", TextIO.Read
+ * .from("gs://my_bucket/path/to/numbers-*.txt")
+ * .withCoder(TextualIntegerCoder.of()));
* }</pre>
*
* <p>To write a {@link PCollection} to one or more text files, use
@@ -88,9 +87,8 @@ import javax.annotation.Nullable;
* the path of the file to write to (e.g., a local filename or sharded
* filename pattern if running locally, or a Google Cloud Storage
* filename or sharded filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}). You can optionally name the resulting transform using
- * {@link TextIO.Write#named(String)}, and you can use {@link TextIO.Write#withCoder(Coder)}
- * to specify the Coder to use to encode the Java values into text lines.
+ * {@code "gs://<bucket>/<filepath>"}). You can use {@link TextIO.Write#withCoder(Coder)}
+ * to specify the {@link Coder} to use to encode the Java values into text lines.
*
* <p>Any existing files with the same names as generated output files
* will be overwritten.
@@ -104,10 +102,10 @@ import javax.annotation.Nullable;
* // A fully-specified Write to a sharded GCS file (runs locally and via the
* // Google Cloud Dataflow service):
* PCollection<Integer> numbers = ...;
- * numbers.apply(TextIO.Write.named("WriteNumbers")
- * .to("gs://my_bucket/path/to/numbers")
- * .withSuffix(".txt")
- * .withCoder(TextualIntegerCoder.of()));
+ * numbers.apply("WriteNumbers", TextIO.Write
+ * .to("gs://my_bucket/path/to/numbers")
+ * .withSuffix(".txt")
+ * .withCoder(TextualIntegerCoder.of()));
* }</pre>
*
* <h3>Permissions</h3>
@@ -130,12 +128,6 @@ public class TextIO {
* {@link #withCoder(Coder)} to change the return type.
*/
public static class Read {
- /**
- * Returns a transform for reading text files that uses the given step name.
- */
- public static Bound<String> named(String name) {
- return new Bound<>(DEFAULT_TEXT_CODER).named(name);
- }
/**
* Returns a transform for reading text files that reads from the file(s)
@@ -228,16 +220,6 @@ public class TextIO {
/**
* Returns a new transform for reading from text files that's like this one but
- * with the given step name.
- *
- * <p>Does not modify this object.
- */
- public Bound<T> named(String name) {
- return new Bound<>(name, filepattern, coder, validate, compressionType);
- }
-
- /**
- * Returns a new transform for reading from text files that's like this one but
* that reads from the file(s) with the given name or pattern. See {@link TextIO.Read#from}
* for a description of filepatterns.
*
@@ -387,12 +369,6 @@ public class TextIO {
* element of the input collection encoded into its own line.
*/
public static class Write {
- /**
- * Returns a transform for writing to text files with the given step name.
- */
- public static Bound<String> named(String name) {
- return new Bound<>(DEFAULT_TEXT_CODER).named(name);
- }
/**
* Returns a transform for writing to text files that writes to the file(s)
@@ -521,17 +497,6 @@ public class TextIO {
/**
* Returns a transform for writing to text files that's like this one but
- * with the given step name.
- *
- * <p>Does not modify this object.
- */
- public Bound<T> named(String name) {
- return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
- shardTemplate, validate);
- }
-
- /**
- * Returns a transform for writing to text files that's like this one but
* that writes to the file(s) with the given filename prefix.
*
* <p>See {@link TextIO.Write#to(String) Write.to(String)} for more information.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc52a102/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
index c2c0685..432c5df 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
@@ -25,14 +25,12 @@
* from existing storage:
* <pre>{@code
* PCollection<TableRow> inputData = pipeline.apply(
- * BigQueryIO.Read.named("Read")
- * .from("clouddataflow-readonly:samples.weather_stations");
+ * BigQueryIO.Read.from("clouddataflow-readonly:samples.weather_stations"));
* }</pre>
* and {@code Write} transforms that persist PCollections to external storage:
* <pre> {@code
* PCollection<Integer> numbers = ...;
- * numbers.apply(TextIO.Write.named("WriteNumbers")
- * .to("gs://my_bucket/path/to/numbers"));
+ * numbers.apply(TextIO.Write.to("gs://my_bucket/path/to/numbers"));
* } </pre>
*/
package org.apache.beam.sdk.io;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc52a102/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index dde5c05..bc122e2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -159,21 +159,6 @@ public class Window {
}
/**
- * Creates a {@code Window} {@code PTransform} with the given name.
- *
- * <p>See the discussion of Naming in
- * {@link org.apache.beam.sdk.transforms.ParDo} for more explanation.
- *
- * <p>The resulting {@code PTransform} is incomplete, and its input/output
- * type is not yet bound. Use {@link Window.Unbound#into} to specify the
- * {@link WindowFn} to use, which will also bind the input/output type of this
- * {@code PTransform}.
- */
- public static Unbound named(String name) {
- return new Unbound().named(name);
- }
-
- /**
* Creates a {@code Window} {@code PTransform} that uses the given
* {@link WindowFn} to window the data.
*
@@ -255,19 +240,6 @@ public class Window {
}
/**
- * Returns a new {@code Window} transform that's like this
- * transform but with the specified name. Does not modify this
- * transform. The resulting transform is still incomplete.
- *
- * <p>See the discussion of Naming in
- * {@link org.apache.beam.sdk.transforms.ParDo} for more
- * explanation.
- */
- public Unbound named(String name) {
- return new Unbound(name);
- }
-
- /**
* Returns a new {@code Window} {@code PTransform} that's like this
* transform but that will use the given {@link WindowFn}, and that has
* its input and output types bound. Does not modify this transform. The
@@ -408,20 +380,6 @@ public class Window {
}
/**
- * Returns a new {@code Window} {@code PTransform} that's like this
- * {@code PTransform} but with the specified name. Does not
- * modify this {@code PTransform}.
- *
- * <p>See the discussion of Naming in
- * {@link org.apache.beam.sdk.transforms.ParDo} for more
- * explanation.
- */
- public Bound<T> named(String name) {
- return new Bound<>(
- name, windowFn, trigger, mode, allowedLateness, closingBehavior, outputTimeFn);
- }
-
- /**
* Sets a non-default trigger for this {@code Window} {@code PTransform}.
* Elements that are assigned to a specific window will be output when
* the trigger fires.
[2/3] incubator-beam git commit: Remove many uses of .named methods
Posted by bc...@apache.org.
Remove many uses of .named methods
Specifically, remove uses of:
- Window.named
- AvroIO.named
- PubSubIO.named
- TextIO.named
- BigQueryIO.named
- Read.named
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/57195358
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/57195358
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/57195358
Branch: refs/heads/master
Commit: 57195358592548e6f7e05bc8e4e292b126a726c5
Parents: 4f580f5
Author: Ben Chambers <bc...@google.com>
Authored: Thu Jun 23 22:27:05 2016 -0700
Committer: Ben Chambers <bc...@google.com>
Committed: Sun Jun 26 10:06:35 2016 -0700
----------------------------------------------------------------------
.../beam/examples/DebuggingWordCount.java | 2 +-
.../org/apache/beam/examples/WordCount.java | 4 +-
.../apache/beam/examples/complete/TfIdf.java | 3 +-
.../examples/complete/TopWikipediaSessions.java | 2 +-
.../examples/cookbook/DatastoreWordCount.java | 2 +-
.../beam/examples/cookbook/DeDupExample.java | 5 +-
.../beam/examples/cookbook/TriggerExample.java | 4 +-
.../beam/examples/complete/game/GameStats.java | 28 ++-
.../examples/complete/game/HourlyTeamScore.java | 5 +-
.../examples/complete/game/LeaderBoard.java | 8 +-
.../beam/runners/flink/examples/TFIDF.java | 3 +-
.../beam/runners/flink/examples/WordCount.java | 4 +-
.../flink/examples/streaming/AutoComplete.java | 9 +-
.../flink/examples/streaming/JoinExamples.java | 13 +-
.../KafkaWindowedWordCountExample.java | 2 +-
.../examples/streaming/WindowedWordCount.java | 3 +-
.../beam/runners/dataflow/DataflowRunner.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 6 +-
.../runners/dataflow/DataflowRunnerTest.java | 18 +-
.../beam/runners/spark/SimpleWordCountTest.java | 3 +-
.../java/org/apache/beam/sdk/io/BigQueryIO.java | 1 -
.../beam/sdk/io/AvroIOGeneratedClassTest.java | 192 +++++--------------
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 5 +-
.../org/apache/beam/sdk/io/BigQueryIOTest.java | 82 +++-----
.../apache/beam/sdk/io/FileBasedSourceTest.java | 5 +-
.../org/apache/beam/sdk/io/PubsubIOTest.java | 4 -
.../java/org/apache/beam/sdk/io/TextIOTest.java | 37 +---
.../org/apache/beam/sdk/io/XmlSourceTest.java | 19 +-
.../beam/sdk/runners/TransformTreeTest.java | 4 +-
.../sdk/transforms/windowing/WindowTest.java | 6 +-
.../sdk/transforms/windowing/WindowingTest.java | 2 +-
.../src/main/java/DebuggingWordCount.java | 2 +-
.../src/main/java/WordCount.java | 4 +-
33 files changed, 158 insertions(+), 331 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index 85823c2..8d85d44 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -173,7 +173,7 @@ public class DebuggingWordCount {
Pipeline p = Pipeline.create(options);
PCollection<KV<String, Long>> filteredWords =
- p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
+ p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
.apply(new WordCount.CountWords())
.apply(ParDo.of(new FilterTextFn(options.getFilterPattern())));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index cf6c45a..af16c44 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -205,10 +205,10 @@ public class WordCount {
// Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
// static FormatAsTextFn() to the ParDo transform.
- p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
+ p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
- .apply(TextIO.Write.named("WriteCounts").to(options.getOutput()));
+ .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
p.run();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 23653d6..8305314 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -187,8 +187,7 @@ public class TfIdf {
}
PCollection<KV<URI, String>> oneUriToLines = pipeline
- .apply(TextIO.Read.from(uriString)
- .named("TextIO.Read(" + uriString + ")"))
+ .apply("TextIO.Read(" + uriString + ")", TextIO.Read.from(uriString))
.apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri));
urisToLines = urisToLines.and(oneUriToLines);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
index 5d95e3f..80b3ade 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -217,7 +217,7 @@ public class TopWikipediaSessions {
.from(options.getInput())
.withCoder(TableRowJsonCoder.of()))
.apply(new ComputeTopSessions(samplingThreshold))
- .apply(TextIO.Write.named("Write").withoutSharding().to(options.getOutput()));
+ .apply("Write", TextIO.Write.withoutSharding().to(options.getOutput()));
p.run();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
index 7578d79..b070f94 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
@@ -193,7 +193,7 @@ public class DatastoreWordCount {
*/
public static void writeDataToDatastore(Options options) {
Pipeline p = Pipeline.create(options);
- p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
+ p.apply("ReadLines", TextIO.Read.from(options.getInput()))
.apply(ParDo.of(new CreateEntityFn(options.getNamespace(), options.getKind())))
.apply(DatastoreIO.writeTo(options.getProject()));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
index db65543..d573bcd 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
@@ -89,10 +89,9 @@ public class DeDupExample {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline p = Pipeline.create(options);
- p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
+ p.apply("ReadLines", TextIO.Read.from(options.getInput()))
.apply(RemoveDuplicates.<String>create())
- .apply(TextIO.Write.named("DedupedShakespeare")
- .to(options.getOutput()));
+ .apply("DedupedShakespeare", TextIO.Write.to(options.getOutput()));
p.run();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index ab1fb66..ff4909b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -467,7 +467,7 @@ public class TriggerExample {
TableReference tableRef = getTableReference(options.getProject(),
options.getBigQueryDataset(), options.getBigQueryTable());
- PCollectionList<TableRow> resultList = pipeline.apply(PubsubIO.Read.named("ReadPubsubInput")
+ PCollectionList<TableRow> resultList = pipeline.apply("ReadPubsubInput", PubsubIO.Read
.timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
.topic(options.getPubsubTopic()))
.apply(ParDo.of(new ExtractFlowInfo()))
@@ -493,7 +493,7 @@ public class TriggerExample {
copiedOptions.setJobName(options.getJobName() + "-injector");
Pipeline injectorPipeline = Pipeline.create(copiedOptions);
injectorPipeline
- .apply(TextIO.Read.named("ReadMyFile").from(options.getInput()))
+ .apply("ReadMyFile", TextIO.Read.from(options.getInput()))
.apply("InsertRandomDelays", ParDo.of(new InsertDelays()))
.apply(IntraBundleParallelization.of(PubsubFileInjector
.withTimestampLabelKey(PUBSUB_TIMESTAMP_LABEL_KEY)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index ad8b49e..b1cb312 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -260,10 +260,8 @@ public class GameStats extends LeaderBoard {
// Calculate the total score per user over fixed windows, and
// cumulative updates for late data.
final PCollectionView<Map<String, Integer>> spammersView = userEvents
- .apply(Window.named("FixedWindowsUser")
- .<KV<String, Integer>>into(FixedWindows.of(
- Duration.standardMinutes(options.getFixedWindowDuration())))
- )
+ .apply("FixedWindowsUser", Window.<KV<String, Integer>>into(
+ FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration()))))
// Filter out everyone but those with (SCORE_WEIGHT * avg) clickrate.
// These might be robots/spammers.
@@ -278,10 +276,8 @@ public class GameStats extends LeaderBoard {
// suspected robots-- to filter out scores from those users from the sum.
// Write the results to BigQuery.
rawEvents
- .apply(Window.named("WindowIntoFixedWindows")
- .<GameActionInfo>into(FixedWindows.of(
- Duration.standardMinutes(options.getFixedWindowDuration())))
- )
+ .apply("WindowIntoFixedWindows", Window.<GameActionInfo>into(
+ FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration()))))
// Filter out the detected spammer users, using the side input derived above.
.apply("FilterOutSpammers", ParDo
.withSideInputs(spammersView)
@@ -299,8 +295,8 @@ public class GameStats extends LeaderBoard {
// [END DocInclude_FilterAndCalc]
// Write the result to BigQuery
.apply("WriteTeamSums",
- new WriteWindowedToBigQuery<KV<String, Integer>>(
- options.getTablePrefix() + "_team", configureWindowedWrite()));
+ new WriteWindowedToBigQuery<KV<String, Integer>>(
+ options.getTablePrefix() + "_team", configureWindowedWrite()));
// [START DocInclude_SessionCalc]
@@ -309,10 +305,9 @@ public class GameStats extends LeaderBoard {
// This information could help the game designers track the changing user engagement
// as their set of games changes.
userEvents
- .apply(Window.named("WindowIntoSessions")
- .<KV<String, Integer>>into(
- Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
- .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
+ .apply("WindowIntoSessions", Window.<KV<String, Integer>>into(
+ Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
+ .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
// For this use, we care only about the existence of the session, not any particular
// information aggregated over it, so the following is an efficient way to do that.
.apply(Combine.perKey(x -> 0))
@@ -321,9 +316,8 @@ public class GameStats extends LeaderBoard {
// [END DocInclude_SessionCalc]
// [START DocInclude_Rewindow]
// Re-window to process groups of session sums according to when the sessions complete.
- .apply(Window.named("WindowToExtractSessionMean")
- .<Integer>into(
- FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration()))))
+ .apply("WindowToExtractSessionMean", Window.<Integer>into(
+ FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration()))))
// Find the mean session duration in each window.
.apply(Mean.<Integer>globally().withoutDefaults())
// Write this info to a BigQuery table.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
index 7a808ac..e489607 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
@@ -178,9 +178,8 @@ public class HourlyTeamScore extends UserScore {
// Add an element timestamp based on the event log, and apply fixed windowing.
.apply("AddEventTimestamps",
WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp())))
- .apply(Window.named("FixedWindowsTeam")
- .<GameActionInfo>into(FixedWindows.of(
- Duration.standardMinutes(options.getWindowDuration()))))
+ .apply("FixedWindowsTeam", Window.<GameActionInfo>into(
+ FixedWindows.of(Duration.standardMinutes(options.getWindowDuration()))))
// [END DocInclude_HTSAddTsAndWindow]
// Extract and sum teamname/score pairs from the event data.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 2c608aa..a14d533 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -190,9 +190,8 @@ public class LeaderBoard extends HourlyTeamScore {
// [START DocInclude_WindowAndTrigger]
// Extract team/score pairs from the event stream, using hour-long windows by default.
gameEvents
- .apply(Window.named("LeaderboardTeamFixedWindows")
- .<GameActionInfo>into(FixedWindows.of(
- Duration.standardMinutes(options.getTeamWindowDuration())))
+ .apply("LeaderboardTeamFixedWindows", Window.<GameActionInfo>into(
+ FixedWindows.of(Duration.standardMinutes(options.getTeamWindowDuration())))
// We will get early (speculative) results as well as cumulative
// processing of late data.
.triggering(
@@ -215,8 +214,7 @@ public class LeaderBoard extends HourlyTeamScore {
// Extract user/score pairs from the event stream using processing time, via global windowing.
// Get periodic updates on all users' running scores.
gameEvents
- .apply(Window.named("LeaderboardUserGlobalWindow")
- .<GameActionInfo>into(new GlobalWindows())
+ .apply("LeaderboardUserGlobalWindow", Window.<GameActionInfo>into(new GlobalWindows())
// Get periodic results every ten minutes.
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(TEN_MINUTES)))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index 084ac7c..56737a4 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -191,8 +191,7 @@ public class TFIDF {
}
PCollection<KV<URI, String>> oneUriToLines = pipeline
- .apply(TextIO.Read.from(uriString)
- .named("TextIO.Read(" + uriString + ")"))
+ .apply("TextIO.Read(" + uriString + ")", TextIO.Read.from(uriString))
.apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri));
urisToLines = urisToLines.and(oneUriToLines);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
index 2817622..2d95c97 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
@@ -109,10 +109,10 @@ public class WordCount {
Pipeline p = Pipeline.create(options);
- p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
+ p.apply("ReadLines", TextIO.Read.from(options.getInput()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
- .apply(TextIO.Write.named("WriteCounts").to(options.getOutput()));
+ .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
p.run();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index ed11781..c0ff85d 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -44,7 +44,6 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
@@ -380,14 +379,14 @@ public class AutoComplete {
options.setExecutionRetryDelay(3000L);
options.setRunner(FlinkRunner.class);
- PTransform<? super PBegin, PCollection<String>> readSource =
- Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("WordStream");
- WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
+
+ WindowFn<Object, ?> windowFn =
+ FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
// Create the pipeline.
Pipeline p = Pipeline.create(options);
PCollection<KV<String, List<CompletionCandidate>>> toWrite = p
- .apply(readSource)
+ .apply("WordStream", Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)))
.apply(ParDo.of(new ExtractWordsFn()))
.apply(Window.<String>into(windowFn)
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
index 0828ddc..f456b27 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
@@ -34,7 +33,6 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
@@ -135,22 +133,19 @@ public class JoinExamples {
options.setExecutionRetryDelay(3000L);
options.setRunner(FlinkRunner.class);
- PTransform<? super PBegin, PCollection<String>> readSourceA =
- Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("FirstStream");
- PTransform<? super PBegin, PCollection<String>> readSourceB =
- Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3)).named("SecondStream");
-
WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
Pipeline p = Pipeline.create(options);
// the following two 'applys' create multiple inputs to our pipeline, one for each
// of our two input sources.
- PCollection<String> streamA = p.apply(readSourceA)
+ PCollection<String> streamA = p
+ .apply("FirstStream", Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)))
.apply(Window.<String>into(windowFn)
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
- PCollection<String> streamB = p.apply(readSourceB)
+ PCollection<String> streamB = p
+ .apply("SecondStream", Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3)))
.apply(Window.<String>into(windowFn)
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
index b14c5ae..4e81420 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -132,7 +132,7 @@ public class KafkaWindowedWordCountExample {
new SimpleStringSchema(), p);
PCollection<String> words = pipeline
- .apply(Read.named("StreamingWordCount").from(UnboundedFlinkSource.of(kafkaConsumer)))
+ .apply("StreamingWordCount", Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
.apply(ParDo.of(new ExtractWordsFn()))
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize())))
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
index f72b705..1b532a7 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
@@ -119,7 +119,8 @@ public class WindowedWordCount {
Pipeline pipeline = Pipeline.create(options);
PCollection<String> words = pipeline
- .apply(Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("StreamingWordCount"))
+ .apply("StreamingWordCount",
+ Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)))
.apply(ParDo.of(new ExtractWordsFn()))
.apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(options.getWindowSize()))
.every(Duration.standardSeconds(options.getSlide())))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index d47d285..7ff247a 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -2686,7 +2686,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
try {
Coder<T> coder = transform.getDefaultOutputCoder(input);
return Pipeline.applyTransform(
- input, PubsubIO.Read.named("StartingSignal").subscription("_starting_signal/"))
+ "StartingSignal", input, PubsubIO.Read.subscription("_starting_signal/"))
.apply(ParDo.of(new OutputNullKv()))
.apply("GlobalSingleton", Window.<KV<Void, Void>>into(new GlobalWindows())
.triggering(AfterPane.elementCountAtLeast(1))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index c3a6a11..e04a1fc 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -127,8 +127,8 @@ public class DataflowPipelineTranslatorTest implements Serializable {
options.setRunner(DataflowRunner.class);
Pipeline p = Pipeline.create(options);
- p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object"))
- .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object"));
+ p.apply("ReadMyFile", TextIO.Read.from("gs://bucket/object"))
+ .apply("WriteMyFile", TextIO.Write.to("gs://bucket/object"));
return p;
}
@@ -458,7 +458,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
// Create a pipeline that the predefined step will be embedded into
Pipeline pipeline = Pipeline.create(options);
- pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in"))
+ pipeline.apply("ReadMyFile", TextIO.Read.from("gs://bucket/in"))
.apply(ParDo.of(new NoOpFn()))
.apply(new EmbeddedTransform(predefinedStep.clone()))
.apply(ParDo.of(new NoOpFn()));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index e094d0d..999dc3a 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow;
import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@@ -151,8 +152,8 @@ public class DataflowRunnerTest {
options.setRunner(DataflowRunner.class);
Pipeline p = Pipeline.create(options);
- p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object"))
- .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object"));
+ p.apply("ReadMyFile", TextIO.Read.from("gs://bucket/object"))
+ .apply("WriteMyFile", TextIO.Write.to("gs://bucket/object"));
return p;
}
@@ -464,7 +465,7 @@ public class DataflowRunnerTest {
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor));
- p.apply(TextIO.Read.named("ReadMyNonGcsFile").from(tmpFolder.newFile().getPath()));
+ p.apply("ReadMyNonGcsFile", TextIO.Read.from(tmpFolder.newFile().getPath()));
thrown.expectCause(Matchers.allOf(
instanceOf(IllegalArgumentException.class),
@@ -477,11 +478,11 @@ public class DataflowRunnerTest {
@Test
public void testNonGcsFilePathInWriteFailure() throws IOException {
Pipeline p = buildDataflowPipeline(buildPipelineOptions());
- PCollection<String> pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object"));
+ PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.Read.from("gs://bucket/object"));
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(containsString("expected a valid 'gs://' path but was given"));
- pc.apply(TextIO.Write.named("WriteMyNonGcsFile").to("/tmp/file"));
+ pc.apply("WriteMyNonGcsFile", TextIO.Write.to("/tmp/file"));
}
@Test
@@ -489,8 +490,7 @@ public class DataflowRunnerTest {
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor));
- p.apply(TextIO.Read.named("ReadInvalidGcsFile")
- .from("gs://bucket/tmp//file"));
+ p.apply("ReadInvalidGcsFile", TextIO.Read.from("gs://bucket/tmp//file"));
thrown.expectCause(Matchers.allOf(
instanceOf(IllegalArgumentException.class),
@@ -502,11 +502,11 @@ public class DataflowRunnerTest {
@Test
public void testMultiSlashGcsFileWritePath() throws IOException {
Pipeline p = buildDataflowPipeline(buildPipelineOptions());
- PCollection<String> pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object"));
+ PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.Read.from("gs://bucket/object"));
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("consecutive slashes");
- pc.apply(TextIO.Write.named("WriteInvalidGcsFile").to("gs://bucket/tmp//file"));
+ pc.apply("WriteInvalidGcsFile", TextIO.Write.to("gs://bucket/tmp//file"));
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index da3fa7a..6a3edd7 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -89,8 +89,7 @@ public class SimpleWordCountTest {
PCollection<String> output = inputWords.apply(new CountWords());
File outputFile = testFolder.newFile();
- output.apply(
- TextIO.Write.named("WriteCounts").to(outputFile.getAbsolutePath()).withoutSharding());
+ output.apply("WriteCounts", TextIO.Write.to(outputFile.getAbsolutePath()).withoutSharding());
EvaluationResult res = SparkRunner.create().run(p);
res.close();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index 6a36c8d..7cac705 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -518,7 +518,6 @@ public class BigQueryIO {
+ " query without a result flattening preference");
}
- // Only verify existence/correctness if validation is enabled.
if (validate) {
// Check for source table/query presence for early failure notification.
// Note that a presence check can fail if the table or dataset are created by earlier
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
index da886de..6e26d33 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
@@ -136,6 +136,18 @@ public class AvroIOGeneratedClassTest {
return users;
}
+ <T> void runTestRead(
+ String applyName, AvroIO.Read.Bound<T> read, String expectedName, T[] expectedOutput)
+ throws Exception {
+ generateAvroFile(generateAvroObjects());
+
+ TestPipeline p = TestPipeline.create();
+ PCollection<T> output = p.apply(applyName, read);
+ PAssert.that(output).containsInAnyOrder(expectedOutput);
+ p.run();
+ assertEquals(expectedName, output.getName());
+ }
+
<T> void runTestRead(AvroIO.Read.Bound<T> read, String expectedName, T[] expectedOutput)
throws Exception {
generateAvroFile(generateAvroObjects());
@@ -158,28 +170,16 @@ public class AvroIOGeneratedClassTest {
AvroIO.Read.withSchema(AvroGeneratedUser.class).from(avroFile.getPath()),
"AvroIO.Read/Read.out",
generateAvroObjects());
- runTestRead(
- AvroIO.Read.named("MyRead").from(avroFile.getPath()).withSchema(AvroGeneratedUser.class),
+ runTestRead("MyRead",
+ AvroIO.Read.from(avroFile.getPath()).withSchema(AvroGeneratedUser.class),
"MyRead/Read.out",
generateAvroObjects());
- runTestRead(
- AvroIO.Read.named("MyRead").withSchema(AvroGeneratedUser.class).from(avroFile.getPath()),
+ runTestRead("MyRead",
+ AvroIO.Read.withSchema(AvroGeneratedUser.class).from(avroFile.getPath()),
"MyRead/Read.out",
generateAvroObjects());
- runTestRead(
- AvroIO.Read.from(avroFile.getPath()).withSchema(AvroGeneratedUser.class).named("HerRead"),
- "HerRead/Read.out",
- generateAvroObjects());
- runTestRead(
- AvroIO.Read.from(avroFile.getPath()).named("HerRead").withSchema(AvroGeneratedUser.class),
- "HerRead/Read.out",
- generateAvroObjects());
- runTestRead(
- AvroIO.Read.withSchema(AvroGeneratedUser.class).named("HerRead").from(avroFile.getPath()),
- "HerRead/Read.out",
- generateAvroObjects());
- runTestRead(
- AvroIO.Read.withSchema(AvroGeneratedUser.class).from(avroFile.getPath()).named("HerRead"),
+ runTestRead("HerRead",
+ AvroIO.Read.from(avroFile.getPath()).withSchema(AvroGeneratedUser.class),
"HerRead/Read.out",
generateAvroObjects());
}
@@ -195,28 +195,20 @@ public class AvroIOGeneratedClassTest {
AvroIO.Read.withSchema(schema).from(avroFile.getPath()),
"AvroIO.Read/Read.out",
generateAvroGenericRecords());
- runTestRead(
- AvroIO.Read.named("MyRead").from(avroFile.getPath()).withSchema(schema),
+ runTestRead("MyRead",
+ AvroIO.Read.from(avroFile.getPath()).withSchema(schema),
"MyRead/Read.out",
generateAvroGenericRecords());
- runTestRead(
- AvroIO.Read.named("MyRead").withSchema(schema).from(avroFile.getPath()),
+ runTestRead("MyRead",
+ AvroIO.Read.withSchema(schema).from(avroFile.getPath()),
"MyRead/Read.out",
generateAvroGenericRecords());
- runTestRead(
- AvroIO.Read.from(avroFile.getPath()).withSchema(schema).named("HerRead"),
- "HerRead/Read.out",
- generateAvroGenericRecords());
- runTestRead(
- AvroIO.Read.from(avroFile.getPath()).named("HerRead").withSchema(schema),
- "HerRead/Read.out",
- generateAvroGenericRecords());
- runTestRead(
- AvroIO.Read.withSchema(schema).named("HerRead").from(avroFile.getPath()),
+ runTestRead("HerRead",
+ AvroIO.Read.from(avroFile.getPath()).withSchema(schema),
"HerRead/Read.out",
generateAvroGenericRecords());
- runTestRead(
- AvroIO.Read.withSchema(schema).from(avroFile.getPath()).named("HerRead"),
+ runTestRead("HerRead",
+ AvroIO.Read.from(avroFile.getPath()).withSchema(schema),
"HerRead/Read.out",
generateAvroGenericRecords());
}
@@ -232,28 +224,12 @@ public class AvroIOGeneratedClassTest {
AvroIO.Read.withSchema(schemaString).from(avroFile.getPath()),
"AvroIO.Read/Read.out",
generateAvroGenericRecords());
- runTestRead(
- AvroIO.Read.named("MyRead").from(avroFile.getPath()).withSchema(schemaString),
- "MyRead/Read.out",
- generateAvroGenericRecords());
- runTestRead(
- AvroIO.Read.named("MyRead").withSchema(schemaString).from(avroFile.getPath()),
+ runTestRead("MyRead",
+ AvroIO.Read.from(avroFile.getPath()).withSchema(schemaString),
"MyRead/Read.out",
generateAvroGenericRecords());
- runTestRead(
- AvroIO.Read.from(avroFile.getPath()).withSchema(schemaString).named("HerRead"),
- "HerRead/Read.out",
- generateAvroGenericRecords());
- runTestRead(
- AvroIO.Read.from(avroFile.getPath()).named("HerRead").withSchema(schemaString),
- "HerRead/Read.out",
- generateAvroGenericRecords());
- runTestRead(
- AvroIO.Read.withSchema(schemaString).named("HerRead").from(avroFile.getPath()),
- "HerRead/Read.out",
- generateAvroGenericRecords());
- runTestRead(
- AvroIO.Read.withSchema(schemaString).from(avroFile.getPath()).named("HerRead"),
+ runTestRead("HerRead",
+ AvroIO.Read.withSchema(schemaString).from(avroFile.getPath()),
"HerRead/Read.out",
generateAvroGenericRecords());
}
@@ -276,106 +252,34 @@ public class AvroIOGeneratedClassTest {
@Test
@Category(NeedsRunner.class)
public void testWriteFromGeneratedClass() throws Exception {
- runTestWrite(AvroIO.Write.to(avroFile.getPath())
- .withSchema(AvroGeneratedUser.class),
- "AvroIO.Write");
- runTestWrite(AvroIO.Write.withSchema(AvroGeneratedUser.class)
- .to(avroFile.getPath()),
- "AvroIO.Write");
- runTestWrite(AvroIO.Write.named("MyWrite")
- .to(avroFile.getPath())
- .withSchema(AvroGeneratedUser.class),
- "MyWrite");
- runTestWrite(AvroIO.Write.named("MyWrite")
- .withSchema(AvroGeneratedUser.class)
- .to(avroFile.getPath()),
- "MyWrite");
- runTestWrite(AvroIO.Write.to(avroFile.getPath())
- .withSchema(AvroGeneratedUser.class)
- .named("HerWrite"),
- "HerWrite");
- runTestWrite(AvroIO.Write.to(avroFile.getPath())
- .named("HerWrite")
- .withSchema(AvroGeneratedUser.class),
- "HerWrite");
- runTestWrite(AvroIO.Write.withSchema(AvroGeneratedUser.class)
- .named("HerWrite")
- .to(avroFile.getPath()),
- "HerWrite");
- runTestWrite(AvroIO.Write.withSchema(AvroGeneratedUser.class)
- .to(avroFile.getPath())
- .named("HerWrite"),
- "HerWrite");
+ runTestWrite(
+ AvroIO.Write.to(avroFile.getPath()).withSchema(AvroGeneratedUser.class),
+ "AvroIO.Write");
+ runTestWrite(
+ AvroIO.Write.withSchema(AvroGeneratedUser.class).to(avroFile.getPath()),
+ "AvroIO.Write");
}
@Test
@Category(NeedsRunner.class)
public void testWriteFromSchema() throws Exception {
- runTestWrite(AvroIO.Write.to(avroFile.getPath())
- .withSchema(schema),
- "AvroIO.Write");
- runTestWrite(AvroIO.Write.withSchema(schema)
- .to(avroFile.getPath()),
- "AvroIO.Write");
- runTestWrite(AvroIO.Write.named("MyWrite")
- .to(avroFile.getPath())
- .withSchema(schema),
- "MyWrite");
- runTestWrite(AvroIO.Write.named("MyWrite")
- .withSchema(schema)
- .to(avroFile.getPath()),
- "MyWrite");
- runTestWrite(AvroIO.Write.to(avroFile.getPath())
- .withSchema(schema)
- .named("HerWrite"),
- "HerWrite");
- runTestWrite(AvroIO.Write.to(avroFile.getPath())
- .named("HerWrite")
- .withSchema(schema),
- "HerWrite");
- runTestWrite(AvroIO.Write.withSchema(schema)
- .named("HerWrite")
- .to(avroFile.getPath()),
- "HerWrite");
- runTestWrite(AvroIO.Write.withSchema(schema)
- .to(avroFile.getPath())
- .named("HerWrite"),
- "HerWrite");
+ runTestWrite(
+ AvroIO.Write.to(avroFile.getPath()).withSchema(schema),
+ "AvroIO.Write");
+ runTestWrite(
+ AvroIO.Write.withSchema(schema).to(avroFile.getPath()),
+ "AvroIO.Write");
}
@Test
@Category(NeedsRunner.class)
public void testWriteFromSchemaString() throws Exception {
- runTestWrite(AvroIO.Write.to(avroFile.getPath())
- .withSchema(schemaString),
- "AvroIO.Write");
- runTestWrite(AvroIO.Write.withSchema(schemaString)
- .to(avroFile.getPath()),
- "AvroIO.Write");
- runTestWrite(AvroIO.Write.named("MyWrite")
- .to(avroFile.getPath())
- .withSchema(schemaString),
- "MyWrite");
- runTestWrite(AvroIO.Write.named("MyWrite")
- .withSchema(schemaString)
- .to(avroFile.getPath()),
- "MyWrite");
- runTestWrite(AvroIO.Write.to(avroFile.getPath())
- .withSchema(schemaString)
- .named("HerWrite"),
- "HerWrite");
- runTestWrite(AvroIO.Write.to(avroFile.getPath())
- .named("HerWrite")
- .withSchema(schemaString),
- "HerWrite");
- runTestWrite(AvroIO.Write.withSchema(schemaString)
- .named("HerWrite")
- .to(avroFile.getPath()),
- "HerWrite");
- runTestWrite(AvroIO.Write.withSchema(schemaString)
- .to(avroFile.getPath())
- .named("HerWrite"),
- "HerWrite");
+ runTestWrite(
+ AvroIO.Write.to(avroFile.getPath()).withSchema(schemaString),
+ "AvroIO.Write");
+ runTestWrite(
+ AvroIO.Write.withSchema(schemaString).to(avroFile.getPath()),
+ "AvroIO.Write");
}
// TODO: for Write only, test withSuffix, withNumShards,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 13c1bcf..8625b10 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -81,10 +82,6 @@ public class AvroIOTest {
public void testAvroIOGetName() {
assertEquals("AvroIO.Read", AvroIO.Read.from("gs://bucket/foo*/baz").getName());
assertEquals("AvroIO.Write", AvroIO.Write.to("gs://bucket/foo/baz").getName());
- assertEquals("ReadMyFile",
- AvroIO.Read.named("ReadMyFile").from("gs://bucket/foo*/baz").getName());
- assertEquals("WriteMyFile",
- AvroIO.Write.named("WriteMyFile").to("gs://bucket/foo/baz").getName());
}
@DefaultCoder(AvroCoder.class)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
index a1daf72..f0d3fce 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
@@ -340,8 +340,7 @@ public class BigQueryIOTest implements Serializable {
checkReadTableObjectWithValidate(bound, project, dataset, table, true);
}
- private void checkReadQueryObject(
- BigQueryIO.Read.Bound bound, String query) {
+ private void checkReadQueryObject(BigQueryIO.Read.Bound bound, String query) {
checkReadQueryObjectWithValidate(bound, query, true);
}
@@ -393,15 +392,13 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildTableBasedSource() {
- BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable")
- .from("foo.com:project:somedataset.sometable");
+ BigQueryIO.Read.Bound bound = BigQueryIO.Read.from("foo.com:project:somedataset.sometable");
checkReadTableObject(bound, "foo.com:project", "somedataset", "sometable");
}
@Test
public void testBuildQueryBasedSource() {
- BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyQuery")
- .fromQuery("foo_query");
+ BigQueryIO.Read.Bound bound = BigQueryIO.Read.fromQuery("foo_query");
checkReadQueryObject(bound, "foo_query");
}
@@ -409,8 +406,8 @@ public class BigQueryIOTest implements Serializable {
public void testBuildTableBasedSourceWithoutValidation() {
// This test just checks that using withoutValidation will not trigger object
// construction errors.
- BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable")
- .from("foo.com:project:somedataset.sometable").withoutValidation();
+ BigQueryIO.Read.Bound bound =
+ BigQueryIO.Read.from("foo.com:project:somedataset.sometable").withoutValidation();
checkReadTableObjectWithValidate(bound, "foo.com:project", "somedataset", "sometable", false);
}
@@ -418,15 +415,15 @@ public class BigQueryIOTest implements Serializable {
public void testBuildQueryBasedSourceWithoutValidation() {
// This test just checks that using withoutValidation will not trigger object
// construction errors.
- BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable")
- .fromQuery("some_query").withoutValidation();
+ BigQueryIO.Read.Bound bound =
+ BigQueryIO.Read.fromQuery("some_query").withoutValidation();
checkReadQueryObjectWithValidate(bound, "some_query", false);
}
@Test
public void testBuildTableBasedSourceWithDefaultProject() {
- BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable")
- .from("somedataset.sometable");
+ BigQueryIO.Read.Bound bound =
+ BigQueryIO.Read.from("somedataset.sometable");
checkReadTableObject(bound, null, "somedataset", "sometable");
}
@@ -436,8 +433,7 @@ public class BigQueryIOTest implements Serializable {
.setProjectId("foo.com:project")
.setDatasetId("somedataset")
.setTableId("sometable");
- BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable")
- .from(table);
+ BigQueryIO.Read.Bound bound = BigQueryIO.Read.from(table);
checkReadTableObject(bound, "foo.com:project", "somedataset", "sometable");
}
@@ -457,18 +453,7 @@ public class BigQueryIOTest implements Serializable {
thrown.expectMessage(
Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence"))
.or(Matchers.containsString("BigQuery dataset not found for table")));
- p.apply(BigQueryIO.Read.named("ReadMyTable").from(tableRef));
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testBuildSourceWithoutTableOrQuery() {
- Pipeline p = TestPipeline.create();
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage(
- "Invalid BigQuery read operation, either table reference or query has to be set");
- p.apply(BigQueryIO.Read.named("ReadMyTable"));
- p.run();
+ p.apply(BigQueryIO.Read.from(tableRef));
}
@Test
@@ -490,8 +475,8 @@ public class BigQueryIOTest implements Serializable {
thrown.expectMessage(
"Invalid BigQuery read operation. Specifies both a query and a table, only one of these"
+ " should be provided");
- p.apply(
- BigQueryIO.Read.named("ReadMyTable")
+ p.apply("ReadMyTable",
+ BigQueryIO.Read
.from("foo.com:project:somedataset.sometable")
.fromQuery("query"));
p.run();
@@ -505,8 +490,8 @@ public class BigQueryIOTest implements Serializable {
thrown.expectMessage(
"Invalid BigQuery read operation. Specifies a"
+ " table with a result flattening preference, which is not configurable");
- p.apply(
- BigQueryIO.Read.named("ReadMyTable")
+ p.apply("ReadMyTable",
+ BigQueryIO.Read
.from("foo.com:project:somedataset.sometable")
.withoutResultFlattening());
p.run();
@@ -521,7 +506,7 @@ public class BigQueryIOTest implements Serializable {
"Invalid BigQuery read operation. Specifies a"
+ " table with a result flattening preference, which is not configurable");
p.apply(
- BigQueryIO.Read.named("ReadMyTable")
+ BigQueryIO.Read
.from("foo.com:project:somedataset.sometable")
.withoutValidation()
.withoutResultFlattening());
@@ -644,8 +629,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildSink() {
- BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
- .to("foo.com:project:somedataset.sometable");
+ BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("foo.com:project:somedataset.sometable");
checkWriteObject(
bound, "foo.com:project", "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
@@ -655,8 +639,8 @@ public class BigQueryIOTest implements Serializable {
public void testBuildSinkwithoutValidation() {
// This test just checks that using withoutValidation will not trigger object
// construction errors.
- BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
- .to("foo.com:project:somedataset.sometable").withoutValidation();
+ BigQueryIO.Write.Bound bound =
+ BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withoutValidation();
checkWriteObjectWithValidate(
bound, "foo.com:project", "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, false);
@@ -664,8 +648,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildSinkDefaultProject() {
- BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
- .to("somedataset.sometable");
+ BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("somedataset.sometable");
checkWriteObject(
bound, null, "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
@@ -677,8 +660,7 @@ public class BigQueryIOTest implements Serializable {
.setProjectId("foo.com:project")
.setDatasetId("somedataset")
.setTableId("sometable");
- BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
- .to(table);
+ BigQueryIO.Write.Bound bound = BigQueryIO.Write.to(table);
checkWriteObject(
bound, "foo.com:project", "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
@@ -691,14 +673,14 @@ public class BigQueryIOTest implements Serializable {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("must set the table reference");
p.apply(Create.<TableRow>of().withCoder(TableRowJsonCoder.of()))
- .apply(BigQueryIO.Write.named("WriteMyTable"));
+ .apply(BigQueryIO.Write.withoutValidation());
}
@Test
public void testBuildSinkWithSchema() {
TableSchema schema = new TableSchema();
- BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
- .to("foo.com:project:somedataset.sometable").withSchema(schema);
+ BigQueryIO.Write.Bound bound =
+ BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withSchema(schema);
checkWriteObject(
bound, "foo.com:project", "somedataset", "sometable",
schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
@@ -706,7 +688,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildSinkWithCreateDispositionNever() {
- BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
+ BigQueryIO.Write.Bound bound = BigQueryIO.Write
.to("foo.com:project:somedataset.sometable")
.withCreateDisposition(CreateDisposition.CREATE_NEVER);
checkWriteObject(
@@ -716,7 +698,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildSinkWithCreateDispositionIfNeeded() {
- BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
+ BigQueryIO.Write.Bound bound = BigQueryIO.Write
.to("foo.com:project:somedataset.sometable")
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED);
checkWriteObject(
@@ -726,7 +708,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildSinkWithWriteDispositionTruncate() {
- BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
+ BigQueryIO.Write.Bound bound = BigQueryIO.Write
.to("foo.com:project:somedataset.sometable")
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE);
checkWriteObject(
@@ -736,7 +718,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildSinkWithWriteDispositionAppend() {
- BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
+ BigQueryIO.Write.Bound bound = BigQueryIO.Write
.to("foo.com:project:somedataset.sometable")
.withWriteDisposition(WriteDisposition.WRITE_APPEND);
checkWriteObject(
@@ -746,7 +728,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildSinkWithWriteDispositionEmpty() {
- BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
+ BigQueryIO.Write.Bound bound = BigQueryIO.Write
.to("foo.com:project:somedataset.sometable")
.withWriteDisposition(WriteDisposition.WRITE_EMPTY);
checkWriteObject(
@@ -794,7 +776,7 @@ public class BigQueryIOTest implements Serializable {
Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence"))
.or(Matchers.containsString("BigQuery dataset not found for table")));
p.apply(Create.<TableRow>of().withCoder(TableRowJsonCoder.of()))
- .apply(BigQueryIO.Write.named("WriteMyTable")
+ .apply(BigQueryIO.Write
.to(tableRef)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withSchema(new TableSchema()));
@@ -878,8 +860,6 @@ public class BigQueryIOTest implements Serializable {
public void testBigQueryIOGetName() {
assertEquals("BigQueryIO.Read", BigQueryIO.Read.from("somedataset.sometable").getName());
assertEquals("BigQueryIO.Write", BigQueryIO.Write.to("somedataset.sometable").getName());
- assertEquals("ReadMyTable", BigQueryIO.Read.named("ReadMyTable").getName());
- assertEquals("WriteMyTable", BigQueryIO.Write.named("WriteMyTable").getName());
}
@Test
@@ -915,7 +895,7 @@ public class BigQueryIOTest implements Serializable {
thrown.expectMessage("must set the table reference of a BigQueryIO.Write transform");
TestPipeline.create()
.apply(Create.<TableRow>of())
- .apply(BigQueryIO.Write.named("name"));
+ .apply("name", BigQueryIO.Write.withoutValidation());
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index b0c577d..c9f4079 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -21,6 +21,7 @@ import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionE
import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent;
import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
+
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@@ -717,7 +718,7 @@ public class FileBasedSourceTest {
File file = createFileWithData(fileName, data);
TestFileBasedSource source = new TestFileBasedSource(file.getPath(), 64, null);
- PCollection<String> output = p.apply(Read.from(source).named("ReadFileData"));
+ PCollection<String> output = p.apply("ReadFileData", Read.from(source));
PAssert.that(output).containsInAnyOrder(data);
p.run();
@@ -743,7 +744,7 @@ public class FileBasedSourceTest {
TestFileBasedSource source =
new TestFileBasedSource(new File(file1.getParent(), "file*").getPath(), 64, null);
- PCollection<String> output = p.apply(Read.from(source).named("ReadFileData"));
+ PCollection<String> output = p.apply("ReadFileData", Read.from(source));
List<String> expectedResults = new ArrayList<String>();
expectedResults.addAll(data1);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
index eaf452d..efa1cd2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
@@ -45,10 +45,6 @@ public class PubsubIOTest {
PubsubIO.Read.topic("projects/myproject/topics/mytopic").getName());
assertEquals("PubsubIO.Write",
PubsubIO.Write.topic("projects/myproject/topics/mytopic").getName());
- assertEquals("ReadMyTopic",
- PubsubIO.Read.named("ReadMyTopic").topic("projects/myproject/topics/mytopic").getName());
- assertEquals("WriteMyTopic",
- PubsubIO.Write.named("WriteMyTopic").topic("projects/myproject/topics/mytopic").getName());
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index c3a5084..df598c8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -22,6 +22,7 @@ import static org.apache.beam.sdk.TestUtils.LINES_ARRAY;
import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY;
import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -44,14 +45,12 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
import com.google.common.collect.ImmutableList;
@@ -167,16 +166,9 @@ public class TextIOTest {
}
{
- PCollection<String> output2 =
- p.apply(TextIO.Read.named("MyRead").from(file));
+ PCollection<String> output2 = p.apply("MyRead", TextIO.Read.from(file));
assertEquals("MyRead/Read.out", output2.getName());
}
-
- {
- PCollection<String> output3 =
- p.apply(TextIO.Read.from(file).named("HerRead"));
- assertEquals("HerRead/Read.out", output3.getName());
- }
}
@Test
@@ -299,27 +291,6 @@ public class TextIOTest {
}
@Test
- public void testWriteNamed() {
- {
- PTransform<PCollection<String>, PDone> transform1 =
- TextIO.Write.to("/tmp/file.txt");
- assertEquals("TextIO.Write", transform1.getName());
- }
-
- {
- PTransform<PCollection<String>, PDone> transform2 =
- TextIO.Write.named("MyWrite").to("/tmp/file.txt");
- assertEquals("MyWrite", transform2.getName());
- }
-
- {
- PTransform<PCollection<String>, PDone> transform3 =
- TextIO.Write.to("/tmp/file.txt").named("HerWrite");
- assertEquals("HerWrite", transform3.getName());
- }
- }
-
- @Test
@Category(NeedsRunner.class)
public void testShardedWrite() throws Exception {
runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), 5);
@@ -620,12 +591,8 @@ public class TextIOTest {
public void testTextIOGetName() {
assertEquals("TextIO.Read", TextIO.Read.from("somefile").getName());
assertEquals("TextIO.Write", TextIO.Write.to("somefile").getName());
- assertEquals("ReadMyFile", TextIO.Read.named("ReadMyFile").from("somefile").getName());
- assertEquals("WriteMyFile", TextIO.Write.named("WriteMyFile").to("somefile").getName());
assertEquals("TextIO.Read", TextIO.Read.from("somefile").toString());
- assertEquals(
- "ReadMyFile [TextIO.Read]", TextIO.Read.named("ReadMyFile").from("somefile").toString());
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
index eb65468..37e3881 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
@@ -21,6 +21,7 @@ import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionE
import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
@@ -582,7 +583,7 @@ public class XmlSourceTest {
.withRecordClass(Train.class)
.withMinBundleSize(1024);
- PCollection<Train> output = p.apply(Read.from(source).named("ReadFileData"));
+ PCollection<Train> output = p.apply("ReadFileData", Read.from(source));
List<Train> expectedResults =
ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null),
@@ -672,7 +673,7 @@ public class XmlSourceTest {
.withRecordElement("train")
.withRecordClass(Train.class)
.withMinBundleSize(1024);
- PCollection<Train> output = p.apply(Read.from(source).named("ReadFileData"));
+ PCollection<Train> output = p.apply("ReadFileData", Read.from(source));
PAssert.that(output).containsInAnyOrder(trains);
p.run();
@@ -814,13 +815,13 @@ public class XmlSourceTest {
Pipeline p = TestPipeline.create();
- XmlSource<Train> source = XmlSource.<Train>from(file.getParent() + "/"
- + "temp*.xml")
- .withRootElement("trains")
- .withRecordElement("train")
- .withRecordClass(Train.class)
- .withMinBundleSize(1024);
- PCollection<Train> output = p.apply(Read.from(source).named("ReadFileData"));
+ XmlSource<Train> source =
+ XmlSource.<Train>from(file.getParent() + "/" + "temp*.xml")
+ .withRootElement("trains")
+ .withRecordElement("train")
+ .withRecordClass(Train.class)
+ .withMinBundleSize(1024);
+ PCollection<Train> output = p.apply("ReadFileData", Read.from(source));
List<Train> expectedResults = new ArrayList<>();
expectedResults.addAll(trains1);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index 0c992c4..08c3996 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -120,9 +120,9 @@ public class TransformTreeTest {
Pipeline p = TestPipeline.create();
- p.apply(TextIO.Read.named("ReadMyFile").from(inputFile.getPath()))
+ p.apply("ReadMyFile", TextIO.Read.from(inputFile.getPath()))
.apply(Sample.<String>any(10))
- .apply(TextIO.Write.named("WriteMyFile").to(outputFile.getPath()));
+ .apply("WriteMyFile", TextIO.Write.to(outputFile.getPath()));
final EnumSet<TransformsSeen> visited =
EnumSet.noneOf(TransformsSeen.class);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index c858f32..76bc038 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -119,11 +119,11 @@ public class WindowTest implements Serializable {
FixedWindows fixed25 = FixedWindows.of(Duration.standardMinutes(25));
WindowingStrategy<?, ?> strategy = TestPipeline.create()
.apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
- .apply(Window.named("WindowInto10").<String>into(fixed10)
+ .apply("WindowInto10", Window.<String>into(fixed10)
.withAllowedLateness(Duration.standardDays(1))
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(5)))
.accumulatingFiredPanes())
- .apply(Window.named("WindowInto25").<String>into(fixed25))
+ .apply("WindowInto25", Window.<String>into(fixed25))
.getWindowingStrategy();
assertEquals(Duration.standardDays(1), strategy.getAllowedLateness());
@@ -272,7 +272,7 @@ public class WindowTest implements Serializable {
@Test
public void testDisplayDataExcludesUnspecifiedProperties() {
- Window.Bound<?> onlyHasAccumulationMode = Window.named("foobar").discardingFiredPanes();
+ Window.Bound<?> onlyHasAccumulationMode = Window.discardingFiredPanes();
assertThat(DisplayData.from(onlyHasAccumulationMode), not(hasDisplayItem(hasKey(isOneOf(
"windowFn",
"trigger",
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index 21f58df..c1e092a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -221,7 +221,7 @@ public class WindowingTest implements Serializable {
Pipeline p = TestPipeline.create();
PCollection<String> output = p.begin()
- .apply(TextIO.Read.named("ReadLines").from(filename))
+ .apply("ReadLines", TextIO.Read.from(filename))
.apply(ParDo.of(new ExtractWordsWithTimestampsFn()))
.apply(new WindowedCount(FixedWindows.of(Duration.millis(10))));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
index 3306cb4..c0e5b17 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
@@ -174,7 +174,7 @@ public class DebuggingWordCount {
Pipeline p = Pipeline.create(options);
PCollection<KV<String, Long>> filteredWords =
- p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
+ p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
.apply(new WordCount.CountWords())
.apply(ParDo.of(new FilterTextFn(options.getFilterPattern())));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
index 07ed6d0..803e800 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
@@ -195,10 +195,10 @@ public class WordCount {
// Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
// static FormatAsTextFn() to the ParDo transform.
- p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
+ p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
.apply(new CountWords())
.apply(ParDo.of(new FormatAsTextFn()))
- .apply(TextIO.Write.named("WriteCounts").to(options.getOutput()));
+ .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
p.run();
}
[3/3] incubator-beam git commit: This closes #529
Posted by bc...@apache.org.
This closes #529
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9abd0926
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9abd0926
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9abd0926
Branch: refs/heads/master
Commit: 9abd0926ac471c178eb30eeb01c964f9ecbd9cce
Parents: 4f580f5 fc52a10
Author: bchambers <bc...@google.com>
Authored: Mon Jun 27 10:29:31 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Mon Jun 27 10:29:31 2016 -0700
----------------------------------------------------------------------
.../beam/examples/DebuggingWordCount.java | 2 +-
.../org/apache/beam/examples/WordCount.java | 4 +-
.../apache/beam/examples/complete/TfIdf.java | 3 +-
.../examples/complete/TopWikipediaSessions.java | 2 +-
.../examples/cookbook/DatastoreWordCount.java | 2 +-
.../beam/examples/cookbook/DeDupExample.java | 5 +-
.../beam/examples/cookbook/TriggerExample.java | 4 +-
.../beam/examples/complete/game/GameStats.java | 28 ++-
.../examples/complete/game/HourlyTeamScore.java | 5 +-
.../examples/complete/game/LeaderBoard.java | 8 +-
.../beam/runners/flink/examples/TFIDF.java | 3 +-
.../beam/runners/flink/examples/WordCount.java | 4 +-
.../flink/examples/streaming/AutoComplete.java | 9 +-
.../flink/examples/streaming/JoinExamples.java | 13 +-
.../KafkaWindowedWordCountExample.java | 2 +-
.../examples/streaming/WindowedWordCount.java | 3 +-
.../beam/runners/dataflow/DataflowRunner.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 6 +-
.../runners/dataflow/DataflowRunnerTest.java | 18 +-
.../beam/runners/spark/SimpleWordCountTest.java | 3 +-
.../java/org/apache/beam/sdk/io/AvroIO.java | 53 +----
.../java/org/apache/beam/sdk/io/BigQueryIO.java | 43 +----
.../java/org/apache/beam/sdk/io/PubsubIO.java | 35 +---
.../main/java/org/apache/beam/sdk/io/Read.java | 29 +--
.../java/org/apache/beam/sdk/io/TextIO.java | 55 +-----
.../org/apache/beam/sdk/io/package-info.java | 6 +-
.../beam/sdk/transforms/windowing/Window.java | 42 ----
.../beam/sdk/io/AvroIOGeneratedClassTest.java | 192 +++++--------------
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 5 +-
.../org/apache/beam/sdk/io/BigQueryIOTest.java | 82 +++-----
.../apache/beam/sdk/io/FileBasedSourceTest.java | 5 +-
.../org/apache/beam/sdk/io/PubsubIOTest.java | 4 -
.../java/org/apache/beam/sdk/io/TextIOTest.java | 37 +---
.../org/apache/beam/sdk/io/XmlSourceTest.java | 19 +-
.../beam/sdk/runners/TransformTreeTest.java | 4 +-
.../sdk/transforms/windowing/WindowTest.java | 6 +-
.../sdk/transforms/windowing/WindowingTest.java | 2 +-
.../src/main/java/DebuggingWordCount.java | 2 +-
.../src/main/java/WordCount.java | 4 +-
39 files changed, 183 insertions(+), 568 deletions(-)
----------------------------------------------------------------------