You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/12/27 17:35:59 UTC

[1/2] incubator-beam git commit: Closes #1696

Repository: incubator-beam
Updated Branches:
  refs/heads/master ae46f9a87 -> ccb8b82c8


Closes #1696


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ccb8b82c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ccb8b82c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ccb8b82c

Branch: refs/heads/master
Commit: ccb8b82c868d16d65de57cbdcaff119a33785bbd
Parents: ae46f9a 8cb1ed5
Author: Dan Halperin <dh...@google.com>
Authored: Tue Dec 27 09:35:47 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Dec 27 09:35:47 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/PubsubIO.java   | 24 ++++++++++++--------
 .../org/apache/beam/sdk/io/PubsubIOTest.java    | 16 +++++++++++++
 2 files changed, 30 insertions(+), 10 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: [BEAM-551] Improve validation in PubSubIO

Posted by dh...@apache.org.
[BEAM-551] Improve validation in PubSubIO


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8cb1ed5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8cb1ed5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8cb1ed5f

Branch: refs/heads/master
Commit: 8cb1ed5fa61816a5e27539e14e58995d4844de6e
Parents: ae46f9a
Author: sammcveety <sa...@gmail.com>
Authored: Fri Dec 23 10:49:32 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Dec 27 09:35:47 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/PubsubIO.java   | 24 ++++++++++++--------
 .../org/apache/beam/sdk/io/PubsubIOTest.java    | 16 +++++++++++++
 2 files changed, 30 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8cb1ed5f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 9a6b534..9b5edd1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -138,7 +138,7 @@ public class PubsubIO {
    * Populate common {@link DisplayData} between Pubsub source and sink.
    */
   private static void populateCommonDisplayData(DisplayData.Builder builder,
-      String timestampLabel, String idLabel, String topic) {
+      String timestampLabel, String idLabel, ValueProvider<PubsubTopic> topic) {
     builder
         .addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)
             .withLabel("Timestamp Label Attribute"))
@@ -146,7 +146,9 @@ public class PubsubIO {
             .withLabel("ID Label Attribute"));
 
     if (topic != null) {
-      builder.add(DisplayData.item("topic", topic)
+      String topicString = topic.isAccessible() ? topic.get().asPath()
+          : topic.toString();
+      builder.add(DisplayData.item("topic", topicString)
           .withLabel("Pubsub Topic"));
     }
   }
@@ -615,6 +617,10 @@ public class PubsubIO {
        * Like {@code subscription()} but with a {@link ValueProvider}.
        */
       public Bound<T> subscription(ValueProvider<String> subscription) {
+        if (subscription.isAccessible()) {
+          // Validate.
+          PubsubSubscription.fromPath(subscription.get());
+        }
         return new Bound<>(name,
             NestedValueProvider.of(subscription, new SubscriptionTranslator()),
             topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
@@ -636,6 +642,10 @@ public class PubsubIO {
        * Like {@code topic()} but with a {@link ValueProvider}.
        */
       public Bound<T> topic(ValueProvider<String> topic) {
+        if (topic.isAccessible()) {
+          // Validate.
+          PubsubTopic.fromPath(topic.get());
+        }
         return new Bound<>(name, subscription,
             NestedValueProvider.of(topic, new TopicTranslator()),
             timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
@@ -736,11 +746,7 @@ public class PubsubIO {
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
         super.populateDisplayData(builder);
-        String topicString =
-            topic == null ? null
-            : topic.isAccessible() ? topic.get().asPath()
-            : topic.toString();
-        populateCommonDisplayData(builder, timestampLabel, idLabel, topicString);
+        populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
 
         builder
             .addIfNotNull(DisplayData.item("maxReadTime", maxReadTime)
@@ -1080,9 +1086,7 @@ public class PubsubIO {
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
         super.populateDisplayData(builder);
-        String topicString = topic.isAccessible()
-            ? topic.get().asPath() : topic.toString();
-        populateCommonDisplayData(builder, timestampLabel, idLabel, topicString);
+        populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8cb1ed5f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
index b73afb2..f1bf788 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
@@ -68,6 +68,22 @@ public class PubsubIOTest {
   }
 
   @Test
+  public void testTopicValidationBadCharacter() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    PubsubIO.Read.topic("projects/my-project/topics/abc-*-abc");
+  }
+
+  @Test
+  public void testTopicValidationTooLong() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    PubsubIO.Read.topic(new StringBuilder().append("projects/my-project/topics/A-really-long-one-")
+        .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
+        .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
+        .append("1111111111111111111111111111111111111111111111111111111111111111111111111111")
+        .toString());
+  }
+
+  @Test
   public void testReadDisplayData() {
     String topic = "projects/project/topics/topic";
     String subscription = "projects/project/subscriptions/subscription";