You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/24 00:21:38 UTC

[1/2] beam git commit: PubSubIO: fix and improve testing for DisplayData

Repository: beam
Updated Branches:
  refs/heads/master f15b52fa3 -> 338012d14


PubSubIO: fix and improve testing for DisplayData

Also adds better type/nullability checking in the code.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4406414a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4406414a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4406414a

Branch: refs/heads/master
Commit: 4406414a52e45213de5521bff4a7599b8aa53a71
Parents: f15b52f
Author: Dan Halperin <dh...@google.com>
Authored: Mon Jan 23 10:01:22 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jan 23 16:21:23 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/PubsubIO.java   | 28 ++++++++++----
 .../beam/sdk/io/PubsubUnboundedSource.java      | 32 +++++++++++-----
 .../org/apache/beam/sdk/io/PubsubIOTest.java    | 39 ++++++++++++++++----
 3 files changed, 75 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4406414a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 1471953..806b7da 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -556,9 +556,10 @@ public class PubsubIO {
         // Validate.
         PubsubSubscription.fromPath(subscription.get());
       }
-      return new Read<>(name,
-          NestedValueProvider.of(subscription, new SubscriptionTranslator()),
-          topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, parseFn);
+      return new Read<>(
+          name, NestedValueProvider.of(subscription, new SubscriptionTranslator()),
+          null /* reset topic to null */, timestampLabel, coder, idLabel, maxNumRecords,
+          maxReadTime, parseFn);
     }
 
     /**
@@ -584,7 +585,7 @@ public class PubsubIO {
         // Validate.
         PubsubTopic.fromPath(topic.get());
       }
-      return new Read<>(name, subscription,
+      return new Read<>(name, null /* reset subscription to null */,
           NestedValueProvider.of(topic, new TopicTranslator()),
           timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, parseFn);
     }
@@ -757,6 +758,7 @@ public class PubsubIO {
     /**
      * Get the topic being read from.
      */
+    @Nullable
     public PubsubTopic getTopic() {
       return topic == null ? null : topic.get();
     }
@@ -771,6 +773,7 @@ public class PubsubIO {
     /**
      * Get the subscription being read from.
      */
+    @Nullable
     public PubsubSubscription getSubscription() {
       return subscription == null ? null : subscription.get();
     }
@@ -785,6 +788,7 @@ public class PubsubIO {
     /**
      * Get the timestamp label.
      */
+    @Nullable
     public String getTimestampLabel() {
       return timestampLabel;
     }
@@ -792,6 +796,7 @@ public class PubsubIO {
     /**
      * Get the id label.
      */
+    @Nullable
     public String getIdLabel() {
       return idLabel;
     }
@@ -800,6 +805,7 @@ public class PubsubIO {
     /**
      * Get the {@link Coder} used for the transform's output.
      */
+    @Nullable
     public Coder<T> getCoder() {
       return coder;
     }
@@ -814,6 +820,7 @@ public class PubsubIO {
     /**
      * Get the maximum read time.
      */
+    @Nullable
     public Duration getMaxReadTime() {
       return maxReadTime;
     }
@@ -821,6 +828,7 @@ public class PubsubIO {
     /**
      * Get the parse function used for PubSub attributes.
      */
+    @Nullable
     public SimpleFunction<PubsubMessage, T> getPubSubMessageParseFn() {
       return parseFn;
     }
@@ -1074,15 +1082,17 @@ public class PubsubIO {
     }
 
     /**
-     * Returns the PubSub topic being read from.
+     * Returns the PubSub topic being written to.
      */
+    @Nullable
     public PubsubTopic getTopic() {
-      return topic.get();
+      return (topic == null) ? null : topic.get();
     }
 
     /**
-     * Returns the {@link ValueProvider} for the topic being read from.
+     * Returns the {@link ValueProvider} for the topic being written to.
      */
+    @Nullable
     public ValueProvider<PubsubTopic> getTopicProvider() {
       return topic;
     }
@@ -1090,6 +1100,7 @@ public class PubsubIO {
     /**
      * Returns the timestamp label.
      */
+    @Nullable
     public String getTimestampLabel() {
       return timestampLabel;
     }
@@ -1097,6 +1108,7 @@ public class PubsubIO {
     /**
      * Returns the id label.
      */
+    @Nullable
     public String getIdLabel() {
       return idLabel;
     }
@@ -1104,6 +1116,7 @@ public class PubsubIO {
     /**
      * Returns the output coder.
      */
+    @Nullable
     public Coder<T> getCoder() {
       return coder;
     }
@@ -1111,6 +1124,7 @@ public class PubsubIO {
     /**
      * Returns the formatting function used if publishing attributes.
      */
+    @Nullable
     public SimpleFunction<T, PubsubMessage> getFormatFn() {
       return formatFn;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/4406414a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index c1f8720..6c8a788 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -1173,21 +1173,25 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
         createAggregator("elements", Sum.ofLongs());
 
     private final PubsubClientFactory pubsubFactory;
+    @Nullable
     private final ValueProvider<SubscriptionPath> subscription;
     @Nullable
+    private final ValueProvider<TopicPath> topic;
+    @Nullable
     private final String timestampLabel;
     @Nullable
     private final String idLabel;
 
     public StatsFn(
         PubsubClientFactory pubsubFactory,
-        ValueProvider<SubscriptionPath> subscription,
-        @Nullable
-            String timestampLabel,
-        @Nullable
-            String idLabel) {
+        @Nullable ValueProvider<SubscriptionPath> subscription,
+        @Nullable ValueProvider<TopicPath> topic,
+        @Nullable String timestampLabel,
+        @Nullable String idLabel) {
+      checkArgument(pubsubFactory != null, "pubsubFactory should not be null");
       this.pubsubFactory = pubsubFactory;
       this.subscription = subscription;
+      this.topic = topic;
       this.timestampLabel = timestampLabel;
       this.idLabel = idLabel;
     }
@@ -1201,11 +1205,18 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     @Override
     public void populateDisplayData(Builder builder) {
       super.populateDisplayData(builder);
-        String subscriptionString =
-            subscription == null ? null
-            : subscription.isAccessible() ? subscription.get().getPath()
+      if (subscription != null) {
+        String subscriptionString = subscription.isAccessible()
+            ? subscription.get().getPath()
             : subscription.toString();
-      builder.add(DisplayData.item("subscription", subscriptionString));
+        builder.add(DisplayData.item("subscription", subscriptionString));
+      }
+      if (topic != null) {
+        String topicString = topic.isAccessible()
+            ? topic.get().getPath()
+            : topic.toString();
+        builder.add(DisplayData.item("topic", topicString));
+      }
       builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
       builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel));
       builder.addIfNotNull(DisplayData.item("idLabel", idLabel));
@@ -1397,7 +1408,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     return input.getPipeline().begin()
                 .apply(Read.from(new PubsubSource<T>(this)))
                 .apply("PubsubUnboundedSource.Stats",
-                    ParDo.of(new StatsFn<T>(pubsubFactory, subscription, timestampLabel, idLabel)));
+                    ParDo.of(new StatsFn<T>(
+                        pubsubFactory, subscription, topic, timestampLabel, idLabel)));
   }
 
   private SubscriptionPath createRandomSubscription(PipelineOptions options) {

http://git-wip-us.apache.org/repos/asf/beam/blob/4406414a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
index e15562e..a0d58ea 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
@@ -88,13 +88,12 @@ public class PubsubIOTest {
   }
 
   @Test
-  public void testReadDisplayData() {
+  public void testReadTopicDisplayData() {
     String topic = "projects/project/topics/topic";
     String subscription = "projects/project/subscriptions/subscription";
     Duration maxReadTime = Duration.standardMinutes(5);
     PubsubIO.Read<String> read = PubsubIO.<String>read()
         .topic(StaticValueProvider.of(topic))
-        .subscription(StaticValueProvider.of(subscription))
         .timestampLabel("myTimestamp")
         .idLabel("myId")
         .maxNumRecords(1234)
@@ -103,6 +102,26 @@ public class PubsubIOTest {
     DisplayData displayData = DisplayData.from(read);
 
     assertThat(displayData, hasDisplayItem("topic", topic));
+    assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
+    assertThat(displayData, hasDisplayItem("idLabel", "myId"));
+    assertThat(displayData, hasDisplayItem("maxNumRecords", 1234));
+    assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime));
+  }
+
+  @Test
+  public void testReadSubscriptionDisplayData() {
+    String topic = "projects/project/topics/topic";
+    String subscription = "projects/project/subscriptions/subscription";
+    Duration maxReadTime = Duration.standardMinutes(5);
+    PubsubIO.Read<String> read = PubsubIO.<String>read()
+        .subscription(StaticValueProvider.of(subscription))
+        .timestampLabel("myTimestamp")
+        .idLabel("myId")
+        .maxNumRecords(1234)
+        .maxReadTime(maxReadTime);
+
+    DisplayData displayData = DisplayData.from(read);
+
     assertThat(displayData, hasDisplayItem("subscription", subscription));
     assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
     assertThat(displayData, hasDisplayItem("idLabel", "myId"));
@@ -134,14 +153,20 @@ public class PubsubIOTest {
   @Category(RunnableOnService.class)
   public void testPrimitiveReadDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    PubsubIO.Read<String> read =
-        PubsubIO.<String>read().subscription("projects/project/subscriptions/subscription")
-            .maxNumRecords(1)
-            .withCoder(StringUtf8Coder.of());
+    Set<DisplayData> displayData;
+    PubsubIO.Read<String> read = PubsubIO.<String>read().withCoder(StringUtf8Coder.of());
 
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
+    // Reading from a subscription.
+    read = read.subscription("projects/project/subscriptions/subscription");
+    displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
     assertThat("PubsubIO.Read should include the subscription in its primitive display data",
         displayData, hasItem(hasDisplayItem("subscription")));
+
+    // Reading from a topic.
+    read = read.topic("projects/project/topics/topic");
+    displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
+    assertThat("PubsubIO.Read should include the topic in its primitive display data",
+        displayData, hasItem(hasDisplayItem("topic")));
   }
 
   @Test


[2/2] beam git commit: This closes #1817

Posted by dh...@apache.org.
This closes #1817


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/338012d1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/338012d1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/338012d1

Branch: refs/heads/master
Commit: 338012d14c2d447f88053641b2cd14de93376c85
Parents: f15b52f 4406414
Author: Dan Halperin <dh...@google.com>
Authored: Mon Jan 23 16:21:29 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jan 23 16:21:29 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/PubsubIO.java   | 28 ++++++++++----
 .../beam/sdk/io/PubsubUnboundedSource.java      | 32 +++++++++++-----
 .../org/apache/beam/sdk/io/PubsubIOTest.java    | 39 ++++++++++++++++----
 3 files changed, 75 insertions(+), 24 deletions(-)
----------------------------------------------------------------------