You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/29 22:31:59 UTC
[7/9] beam git commit: Renames PubsubIO.Read builder methods to be
style guide compliant
Renames PubsubIO.Read builder methods to be style guide compliant
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5d8fbc4c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5d8fbc4c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5d8fbc4c
Branch: refs/heads/master
Commit: 5d8fbc4c4d87f75ea84a40c2ee36531eb0eda26f
Parents: f4d0460
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 20 17:19:37 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 13:15:48 2017 -0700
----------------------------------------------------------------------
.../beam/examples/complete/game/GameStats.java | 2 +-
.../examples/complete/game/LeaderBoard.java | 2 +-
.../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 24 ++++++-------
.../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 38 ++++++++++----------
4 files changed, 33 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5d8fbc4c/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index e0048b7..d95eb06 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -253,7 +253,7 @@ public class GameStats extends LeaderBoard {
// Read Events from Pub/Sub using custom timestamps
PCollection<GameActionInfo> rawEvents = pipeline
.apply(PubsubIO.<String>read()
- .timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic())
+ .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic())
.withCoder(StringUtf8Coder.of()))
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
http://git-wip-us.apache.org/repos/asf/beam/blob/5d8fbc4c/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 96f4291..a87468a 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -192,7 +192,7 @@ public class LeaderBoard extends HourlyTeamScore {
// data elements, and parse the data.
PCollection<GameActionInfo> gameEvents = pipeline
.apply(PubsubIO.<String>read()
- .timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic())
+ .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic())
.withCoder(StringUtf8Coder.of()))
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
http://git-wip-us.apache.org/repos/asf/beam/blob/5d8fbc4c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 3c76942..20aed6d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -519,14 +519,14 @@ public class PubsubIO {
* some arbitrary portion of the data. Most likely, separate readers should
* use their own subscriptions.
*/
- public Read<T> subscription(String subscription) {
- return subscription(StaticValueProvider.of(subscription));
+ public Read<T> fromSubscription(String subscription) {
+ return fromSubscription(StaticValueProvider.of(subscription));
}
/**
* Like {@code subscription()} but with a {@link ValueProvider}.
*/
- public Read<T> subscription(ValueProvider<String> subscription) {
+ public Read<T> fromSubscription(ValueProvider<String> subscription) {
if (subscription.isAccessible()) {
// Validate.
PubsubSubscription.fromPath(subscription.get());
@@ -541,7 +541,7 @@ public class PubsubIO {
/**
* Creates and returns a transform for reading from a Cloud Pub/Sub topic. Mutually exclusive
- * with {@link #subscription(String)}.
+ * with {@link #fromSubscription(String)}.
*
* <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format
* of the {@code topic} string.
@@ -550,14 +550,14 @@ public class PubsubIO {
* pipeline is started. Any data published on the topic before the pipeline is started will
* not be read by the runner.
*/
- public Read<T> topic(String topic) {
- return topic(StaticValueProvider.of(topic));
+ public Read<T> fromTopic(String topic) {
+ return fromTopic(StaticValueProvider.of(topic));
}
/**
* Like {@code topic()} but with a {@link ValueProvider}.
*/
- public Read<T> topic(ValueProvider<String> topic) {
+ public Read<T> fromTopic(ValueProvider<String> topic) {
if (topic.isAccessible()) {
// Validate.
PubsubTopic.fromPath(topic.get());
@@ -598,7 +598,7 @@ public class PubsubIO {
*
* @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
*/
- public Read<T> timestampLabel(String timestampLabel) {
+ public Read<T> withTimestampLabel(String timestampLabel) {
return toBuilder().setTimestampLabel(timestampLabel).build();
}
@@ -611,7 +611,7 @@ public class PubsubIO {
* If {@code idLabel} is not provided, Beam cannot guarantee that no duplicate data will
* be delivered, and deduplication of the stream will be strictly best effort.
*/
- public Read<T> idLabel(String idLabel) {
+ public Read<T> withIdLabel(String idLabel) {
return toBuilder().setIdLabel(idLabel).build();
}
@@ -628,7 +628,7 @@ public class PubsubIO {
* A Coder for the output type T must be registered or set on the output via
* {@link PCollection#setCoder(Coder)}.
*/
- public Read<T> withAttributes(SimpleFunction<PubsubMessage, T> parseFn) {
+ public Read<T> withParseFn(SimpleFunction<PubsubMessage, T> parseFn) {
return toBuilder().setParseFn(parseFn).build();
}
@@ -760,7 +760,7 @@ public class PubsubIO {
* time classes, {@link Instant#Instant(long)} can be used to parse this value.
*
* <p>If the output from this sink is being read by another Beam pipeline, then
- * {@link PubsubIO.Read#timestampLabel(String)} can be used to ensure the other source reads
+ * {@link PubsubIO.Read#withTimestampLabel(String)} can be used to ensure the other source reads
* these timestamps from the appropriate attribute.
*/
public Write<T> timestampLabel(String timestampLabel) {
@@ -773,7 +773,7 @@ public class PubsubIO {
* opaque string.
*
* <p>If the the output from this sink is being read by another Beam pipeline, then
- * {@link PubsubIO.Read#idLabel(String)} can be used to ensure that* the other source reads
+ * {@link PubsubIO.Read#withIdLabel(String)} can be used to ensure that* the other source reads
* these unique identifiers from the appropriate attribute.
*/
public Write<T> idLabel(String idLabel) {
http://git-wip-us.apache.org/repos/asf/beam/blob/5d8fbc4c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 7fe6e26..f44fffc 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -50,19 +50,19 @@ public class PubsubIOTest {
@Test
public void testPubsubIOGetName() {
assertEquals("PubsubIO.Read",
- PubsubIO.<String>read().topic("projects/myproject/topics/mytopic").getName());
+ PubsubIO.<String>read().fromTopic("projects/myproject/topics/mytopic").getName());
assertEquals("PubsubIO.Write",
PubsubIO.<String>write().topic("projects/myproject/topics/mytopic").getName());
}
@Test
public void testTopicValidationSuccess() throws Exception {
- PubsubIO.<String>read().topic("projects/my-project/topics/abc");
- PubsubIO.<String>read().topic("projects/my-project/topics/ABC");
- PubsubIO.<String>read().topic("projects/my-project/topics/AbC-DeF");
- PubsubIO.<String>read().topic("projects/my-project/topics/AbC-1234");
- PubsubIO.<String>read().topic("projects/my-project/topics/AbC-1234-_.~%+-_.~%+-_.~%+-abc");
- PubsubIO.<String>read().topic(new StringBuilder()
+ PubsubIO.<String>read().fromTopic("projects/my-project/topics/abc");
+ PubsubIO.<String>read().fromTopic("projects/my-project/topics/ABC");
+ PubsubIO.<String>read().fromTopic("projects/my-project/topics/AbC-DeF");
+ PubsubIO.<String>read().fromTopic("projects/my-project/topics/AbC-1234");
+ PubsubIO.<String>read().fromTopic("projects/my-project/topics/AbC-1234-_.~%+-_.~%+-_.~%+-abc");
+ PubsubIO.<String>read().fromTopic(new StringBuilder()
.append("projects/my-project/topics/A-really-long-one-")
.append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
.append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
@@ -73,13 +73,13 @@ public class PubsubIOTest {
@Test
public void testTopicValidationBadCharacter() throws Exception {
thrown.expect(IllegalArgumentException.class);
- PubsubIO.<String>read().topic("projects/my-project/topics/abc-*-abc");
+ PubsubIO.<String>read().fromTopic("projects/my-project/topics/abc-*-abc");
}
@Test
public void testTopicValidationTooLong() throws Exception {
thrown.expect(IllegalArgumentException.class);
- PubsubIO.<String>read().topic(new StringBuilder().append
+ PubsubIO.<String>read().fromTopic(new StringBuilder().append
("projects/my-project/topics/A-really-long-one-")
.append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
.append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
@@ -93,9 +93,9 @@ public class PubsubIOTest {
String subscription = "projects/project/subscriptions/subscription";
Duration maxReadTime = Duration.standardMinutes(5);
PubsubIO.Read<String> read = PubsubIO.<String>read()
- .topic(StaticValueProvider.of(topic))
- .timestampLabel("myTimestamp")
- .idLabel("myId");
+ .fromTopic(StaticValueProvider.of(topic))
+ .withTimestampLabel("myTimestamp")
+ .withIdLabel("myId");
DisplayData displayData = DisplayData.from(read);
@@ -110,9 +110,9 @@ public class PubsubIOTest {
String subscription = "projects/project/subscriptions/subscription";
Duration maxReadTime = Duration.standardMinutes(5);
PubsubIO.Read<String> read = PubsubIO.<String>read()
- .subscription(StaticValueProvider.of(subscription))
- .timestampLabel("myTimestamp")
- .idLabel("myId");
+ .fromSubscription(StaticValueProvider.of(subscription))
+ .withTimestampLabel("myTimestamp")
+ .withIdLabel("myId");
DisplayData displayData = DisplayData.from(read);
@@ -125,7 +125,7 @@ public class PubsubIOTest {
public void testNullTopic() {
String subscription = "projects/project/subscriptions/subscription";
PubsubIO.Read<String> read = PubsubIO.<String>read()
- .subscription(StaticValueProvider.of(subscription));
+ .fromSubscription(StaticValueProvider.of(subscription));
assertNull(read.getTopicProvider());
assertNotNull(read.getSubscriptionProvider());
assertNotNull(DisplayData.from(read));
@@ -135,7 +135,7 @@ public class PubsubIOTest {
public void testNullSubscription() {
String topic = "projects/project/topics/topic";
PubsubIO.Read<String> read = PubsubIO.<String>read()
- .topic(StaticValueProvider.of(topic));
+ .fromTopic(StaticValueProvider.of(topic));
assertNotNull(read.getTopicProvider());
assertNull(read.getSubscriptionProvider());
assertNotNull(DisplayData.from(read));
@@ -149,13 +149,13 @@ public class PubsubIOTest {
PubsubIO.Read<String> read = PubsubIO.<String>read().withCoder(StringUtf8Coder.of());
// Reading from a subscription.
- read = read.subscription("projects/project/subscriptions/subscription");
+ read = read.fromSubscription("projects/project/subscriptions/subscription");
displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
assertThat("PubsubIO.Read should include the subscription in its primitive display data",
displayData, hasItem(hasDisplayItem("subscription")));
// Reading from a topic.
- read = read.topic("projects/project/topics/topic");
+ read = read.fromTopic("projects/project/topics/topic");
displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
assertThat("PubsubIO.Read should include the topic in its primitive display data",
displayData, hasItem(hasDisplayItem("topic")));