You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/24 00:21:38 UTC
[1/2] beam git commit: PubSubIO: fix and improve testing for
DisplayData
Repository: beam
Updated Branches:
refs/heads/master f15b52fa3 -> 338012d14
PubSubIO: fix and improve testing for DisplayData
Also adds better type/nullability checking in the code.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4406414a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4406414a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4406414a
Branch: refs/heads/master
Commit: 4406414a52e45213de5521bff4a7599b8aa53a71
Parents: f15b52f
Author: Dan Halperin <dh...@google.com>
Authored: Mon Jan 23 10:01:22 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jan 23 16:21:23 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/PubsubIO.java | 28 ++++++++++----
.../beam/sdk/io/PubsubUnboundedSource.java | 32 +++++++++++-----
.../org/apache/beam/sdk/io/PubsubIOTest.java | 39 ++++++++++++++++----
3 files changed, 75 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4406414a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 1471953..806b7da 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -556,9 +556,10 @@ public class PubsubIO {
// Validate.
PubsubSubscription.fromPath(subscription.get());
}
- return new Read<>(name,
- NestedValueProvider.of(subscription, new SubscriptionTranslator()),
- topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, parseFn);
+ return new Read<>(
+ name, NestedValueProvider.of(subscription, new SubscriptionTranslator()),
+ null /* reset topic to null */, timestampLabel, coder, idLabel, maxNumRecords,
+ maxReadTime, parseFn);
}
/**
@@ -584,7 +585,7 @@ public class PubsubIO {
// Validate.
PubsubTopic.fromPath(topic.get());
}
- return new Read<>(name, subscription,
+ return new Read<>(name, null /* reset subscription to null */,
NestedValueProvider.of(topic, new TopicTranslator()),
timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, parseFn);
}
@@ -757,6 +758,7 @@ public class PubsubIO {
/**
* Get the topic being read from.
*/
+ @Nullable
public PubsubTopic getTopic() {
return topic == null ? null : topic.get();
}
@@ -771,6 +773,7 @@ public class PubsubIO {
/**
* Get the subscription being read from.
*/
+ @Nullable
public PubsubSubscription getSubscription() {
return subscription == null ? null : subscription.get();
}
@@ -785,6 +788,7 @@ public class PubsubIO {
/**
* Get the timestamp label.
*/
+ @Nullable
public String getTimestampLabel() {
return timestampLabel;
}
@@ -792,6 +796,7 @@ public class PubsubIO {
/**
* Get the id label.
*/
+ @Nullable
public String getIdLabel() {
return idLabel;
}
@@ -800,6 +805,7 @@ public class PubsubIO {
/**
* Get the {@link Coder} used for the transform's output.
*/
+ @Nullable
public Coder<T> getCoder() {
return coder;
}
@@ -814,6 +820,7 @@ public class PubsubIO {
/**
* Get the maximum read time.
*/
+ @Nullable
public Duration getMaxReadTime() {
return maxReadTime;
}
@@ -821,6 +828,7 @@ public class PubsubIO {
/**
* Get the parse function used for PubSub attributes.
*/
+ @Nullable
public SimpleFunction<PubsubMessage, T> getPubSubMessageParseFn() {
return parseFn;
}
@@ -1074,15 +1082,17 @@ public class PubsubIO {
}
/**
- * Returns the PubSub topic being read from.
+ * Returns the PubSub topic being written to.
*/
+ @Nullable
public PubsubTopic getTopic() {
- return topic.get();
+ return (topic == null) ? null : topic.get();
}
/**
- * Returns the {@link ValueProvider} for the topic being read from.
+ * Returns the {@link ValueProvider} for the topic being written to.
*/
+ @Nullable
public ValueProvider<PubsubTopic> getTopicProvider() {
return topic;
}
@@ -1090,6 +1100,7 @@ public class PubsubIO {
/**
* Returns the timestamp label.
*/
+ @Nullable
public String getTimestampLabel() {
return timestampLabel;
}
@@ -1097,6 +1108,7 @@ public class PubsubIO {
/**
* Returns the id label.
*/
+ @Nullable
public String getIdLabel() {
return idLabel;
}
@@ -1104,6 +1116,7 @@ public class PubsubIO {
/**
* Returns the output coder.
*/
+ @Nullable
public Coder<T> getCoder() {
return coder;
}
@@ -1111,6 +1124,7 @@ public class PubsubIO {
/**
* Returns the formatting function used if publishing attributes.
*/
+ @Nullable
public SimpleFunction<T, PubsubMessage> getFormatFn() {
return formatFn;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4406414a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index c1f8720..6c8a788 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -1173,21 +1173,25 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
createAggregator("elements", Sum.ofLongs());
private final PubsubClientFactory pubsubFactory;
+ @Nullable
private final ValueProvider<SubscriptionPath> subscription;
@Nullable
+ private final ValueProvider<TopicPath> topic;
+ @Nullable
private final String timestampLabel;
@Nullable
private final String idLabel;
public StatsFn(
PubsubClientFactory pubsubFactory,
- ValueProvider<SubscriptionPath> subscription,
- @Nullable
- String timestampLabel,
- @Nullable
- String idLabel) {
+ @Nullable ValueProvider<SubscriptionPath> subscription,
+ @Nullable ValueProvider<TopicPath> topic,
+ @Nullable String timestampLabel,
+ @Nullable String idLabel) {
+ checkArgument(pubsubFactory != null, "pubsubFactory should not be null");
this.pubsubFactory = pubsubFactory;
this.subscription = subscription;
+ this.topic = topic;
this.timestampLabel = timestampLabel;
this.idLabel = idLabel;
}
@@ -1201,11 +1205,18 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
@Override
public void populateDisplayData(Builder builder) {
super.populateDisplayData(builder);
- String subscriptionString =
- subscription == null ? null
- : subscription.isAccessible() ? subscription.get().getPath()
+ if (subscription != null) {
+ String subscriptionString = subscription.isAccessible()
+ ? subscription.get().getPath()
: subscription.toString();
- builder.add(DisplayData.item("subscription", subscriptionString));
+ builder.add(DisplayData.item("subscription", subscriptionString));
+ }
+ if (topic != null) {
+ String topicString = topic.isAccessible()
+ ? topic.get().getPath()
+ : topic.toString();
+ builder.add(DisplayData.item("topic", topicString));
+ }
builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel));
builder.addIfNotNull(DisplayData.item("idLabel", idLabel));
@@ -1397,7 +1408,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
return input.getPipeline().begin()
.apply(Read.from(new PubsubSource<T>(this)))
.apply("PubsubUnboundedSource.Stats",
- ParDo.of(new StatsFn<T>(pubsubFactory, subscription, timestampLabel, idLabel)));
+ ParDo.of(new StatsFn<T>(
+ pubsubFactory, subscription, topic, timestampLabel, idLabel)));
}
private SubscriptionPath createRandomSubscription(PipelineOptions options) {
http://git-wip-us.apache.org/repos/asf/beam/blob/4406414a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
index e15562e..a0d58ea 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
@@ -88,13 +88,12 @@ public class PubsubIOTest {
}
@Test
- public void testReadDisplayData() {
+ public void testReadTopicDisplayData() {
String topic = "projects/project/topics/topic";
String subscription = "projects/project/subscriptions/subscription";
Duration maxReadTime = Duration.standardMinutes(5);
PubsubIO.Read<String> read = PubsubIO.<String>read()
.topic(StaticValueProvider.of(topic))
- .subscription(StaticValueProvider.of(subscription))
.timestampLabel("myTimestamp")
.idLabel("myId")
.maxNumRecords(1234)
@@ -103,6 +102,26 @@ public class PubsubIOTest {
DisplayData displayData = DisplayData.from(read);
assertThat(displayData, hasDisplayItem("topic", topic));
+ assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
+ assertThat(displayData, hasDisplayItem("idLabel", "myId"));
+ assertThat(displayData, hasDisplayItem("maxNumRecords", 1234));
+ assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime));
+ }
+
+ @Test
+ public void testReadSubscriptionDisplayData() {
+ String topic = "projects/project/topics/topic";
+ String subscription = "projects/project/subscriptions/subscription";
+ Duration maxReadTime = Duration.standardMinutes(5);
+ PubsubIO.Read<String> read = PubsubIO.<String>read()
+ .subscription(StaticValueProvider.of(subscription))
+ .timestampLabel("myTimestamp")
+ .idLabel("myId")
+ .maxNumRecords(1234)
+ .maxReadTime(maxReadTime);
+
+ DisplayData displayData = DisplayData.from(read);
+
assertThat(displayData, hasDisplayItem("subscription", subscription));
assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
assertThat(displayData, hasDisplayItem("idLabel", "myId"));
@@ -134,14 +153,20 @@ public class PubsubIOTest {
@Category(RunnableOnService.class)
public void testPrimitiveReadDisplayData() {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- PubsubIO.Read<String> read =
- PubsubIO.<String>read().subscription("projects/project/subscriptions/subscription")
- .maxNumRecords(1)
- .withCoder(StringUtf8Coder.of());
+ Set<DisplayData> displayData;
+ PubsubIO.Read<String> read = PubsubIO.<String>read().withCoder(StringUtf8Coder.of());
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
+ // Reading from a subscription.
+ read = read.subscription("projects/project/subscriptions/subscription");
+ displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
assertThat("PubsubIO.Read should include the subscription in its primitive display data",
displayData, hasItem(hasDisplayItem("subscription")));
+
+ // Reading from a topic.
+ read = read.topic("projects/project/topics/topic");
+ displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
+ assertThat("PubsubIO.Read should include the topic in its primitive display data",
+ displayData, hasItem(hasDisplayItem("topic")));
}
@Test
[2/2] beam git commit: This closes #1817
Posted by dh...@apache.org.
This closes #1817
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/338012d1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/338012d1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/338012d1
Branch: refs/heads/master
Commit: 338012d14c2d447f88053641b2cd14de93376c85
Parents: f15b52f 4406414
Author: Dan Halperin <dh...@google.com>
Authored: Mon Jan 23 16:21:29 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jan 23 16:21:29 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/PubsubIO.java | 28 ++++++++++----
.../beam/sdk/io/PubsubUnboundedSource.java | 32 +++++++++++-----
.../org/apache/beam/sdk/io/PubsubIOTest.java | 39 ++++++++++++++++----
3 files changed, 75 insertions(+), 24 deletions(-)
----------------------------------------------------------------------