You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/12/27 17:35:59 UTC
[1/2] incubator-beam git commit: Closes #1696
Repository: incubator-beam
Updated Branches:
refs/heads/master ae46f9a87 -> ccb8b82c8
Closes #1696
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ccb8b82c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ccb8b82c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ccb8b82c
Branch: refs/heads/master
Commit: ccb8b82c868d16d65de57cbdcaff119a33785bbd
Parents: ae46f9a 8cb1ed5
Author: Dan Halperin <dh...@google.com>
Authored: Tue Dec 27 09:35:47 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Dec 27 09:35:47 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/PubsubIO.java | 24 ++++++++++++--------
.../org/apache/beam/sdk/io/PubsubIOTest.java | 16 +++++++++++++
2 files changed, 30 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: [BEAM-551] Improve validation in
PubSubIO
Posted by dh...@apache.org.
[BEAM-551] Improve validation in PubSubIO
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8cb1ed5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8cb1ed5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8cb1ed5f
Branch: refs/heads/master
Commit: 8cb1ed5fa61816a5e27539e14e58995d4844de6e
Parents: ae46f9a
Author: sammcveety <sa...@gmail.com>
Authored: Fri Dec 23 10:49:32 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Dec 27 09:35:47 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/PubsubIO.java | 24 ++++++++++++--------
.../org/apache/beam/sdk/io/PubsubIOTest.java | 16 +++++++++++++
2 files changed, 30 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8cb1ed5f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 9a6b534..9b5edd1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -138,7 +138,7 @@ public class PubsubIO {
* Populate common {@link DisplayData} between Pubsub source and sink.
*/
private static void populateCommonDisplayData(DisplayData.Builder builder,
- String timestampLabel, String idLabel, String topic) {
+ String timestampLabel, String idLabel, ValueProvider<PubsubTopic> topic) {
builder
.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)
.withLabel("Timestamp Label Attribute"))
@@ -146,7 +146,9 @@ public class PubsubIO {
.withLabel("ID Label Attribute"));
if (topic != null) {
- builder.add(DisplayData.item("topic", topic)
+ String topicString = topic.isAccessible() ? topic.get().asPath()
+ : topic.toString();
+ builder.add(DisplayData.item("topic", topicString)
.withLabel("Pubsub Topic"));
}
}
@@ -615,6 +617,10 @@ public class PubsubIO {
* Like {@code subscription()} but with a {@link ValueProvider}.
*/
public Bound<T> subscription(ValueProvider<String> subscription) {
+ if (subscription.isAccessible()) {
+ // Validate.
+ PubsubSubscription.fromPath(subscription.get());
+ }
return new Bound<>(name,
NestedValueProvider.of(subscription, new SubscriptionTranslator()),
topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
@@ -636,6 +642,10 @@ public class PubsubIO {
* Like {@code topic()} but with a {@link ValueProvider}.
*/
public Bound<T> topic(ValueProvider<String> topic) {
+ if (topic.isAccessible()) {
+ // Validate.
+ PubsubTopic.fromPath(topic.get());
+ }
return new Bound<>(name, subscription,
NestedValueProvider.of(topic, new TopicTranslator()),
timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
@@ -736,11 +746,7 @@ public class PubsubIO {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- String topicString =
- topic == null ? null
- : topic.isAccessible() ? topic.get().asPath()
- : topic.toString();
- populateCommonDisplayData(builder, timestampLabel, idLabel, topicString);
+ populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
builder
.addIfNotNull(DisplayData.item("maxReadTime", maxReadTime)
@@ -1080,9 +1086,7 @@ public class PubsubIO {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- String topicString = topic.isAccessible()
- ? topic.get().asPath() : topic.toString();
- populateCommonDisplayData(builder, timestampLabel, idLabel, topicString);
+ populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8cb1ed5f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
index b73afb2..f1bf788 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
@@ -68,6 +68,22 @@ public class PubsubIOTest {
}
@Test
+ public void testTopicValidationBadCharacter() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ PubsubIO.Read.topic("projects/my-project/topics/abc-*-abc");
+ }
+
+ @Test
+ public void testTopicValidationTooLong() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ PubsubIO.Read.topic(new StringBuilder().append("projects/my-project/topics/A-really-long-one-")
+ .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
+ .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
+ .append("1111111111111111111111111111111111111111111111111111111111111111111111111111")
+ .toString());
+ }
+
+ @Test
public void testReadDisplayData() {
String topic = "projects/project/topics/topic";
String subscription = "projects/project/subscriptions/subscription";