You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/29 22:31:58 UTC
[6/9] beam git commit: Adds PubsubIO.readStrings(), readProtos(),
readAvros()
Adds PubsubIO.readStrings(), readProtos(), readAvros()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/079353d5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/079353d5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/079353d5
Branch: refs/heads/master
Commit: 079353d58c65141683e4640e425ee610001e7718
Parents: 42c975e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 20 17:50:43 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 13:15:48 2017 -0700
----------------------------------------------------------------------
.../beam/examples/complete/game/GameStats.java | 6 ++---
.../examples/complete/game/LeaderBoard.java | 6 ++---
.../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 28 ++++++++++++++++++++
.../io/gcp/pubsub/PubsubUnboundedSource.java | 4 ++-
.../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 3 +--
5 files changed, 36 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/079353d5/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index d95eb06..d628497 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -24,7 +24,6 @@ import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
@@ -252,9 +251,8 @@ public class GameStats extends LeaderBoard {
// Read Events from Pub/Sub using custom timestamps
PCollection<GameActionInfo> rawEvents = pipeline
- .apply(PubsubIO.<String>read()
- .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic())
- .withCoder(StringUtf8Coder.of()))
+ .apply(PubsubIO.readStrings()
+ .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
// Extract username/score pairs from the event stream
http://git-wip-us.apache.org/repos/asf/beam/blob/079353d5/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index a87468a..fbffac6 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -27,7 +27,6 @@ import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
@@ -191,9 +190,8 @@ public class LeaderBoard extends HourlyTeamScore {
// Read game events from Pub/Sub using custom timestamps, which are extracted from the pubsub
// data elements, and parse the data.
PCollection<GameActionInfo> gameEvents = pipeline
- .apply(PubsubIO.<String>read()
- .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic())
- .withCoder(StringUtf8Coder.of()))
+ .apply(PubsubIO.readStrings()
+ .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
gameEvents.apply("CalculateTeamScores",
http://git-wip-us.apache.org/repos/asf/beam/blob/079353d5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 99df3c6..9604864 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
+import com.google.protobuf.Message;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -29,8 +30,11 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
@@ -461,6 +465,30 @@ public class PubsubIO {
return new AutoValue_PubsubIO_Read.Builder<T>().build();
}
+ /**
+ * Returns A {@link PTransform} that continuously reads UTF-8 encoded strings from a Google Cloud
+ * Pub/Sub stream.
+ */
+ public static Read<String> readStrings() {
+ return PubsubIO.<String>read().withCoder(StringUtf8Coder.of());
+ }
+
+ /**
+ * Returns A {@link PTransform} that continuously reads binary encoded protos of the given type
+ * from a Google Cloud Pub/Sub stream.
+ */
+ public static <T extends Message> Read<T> readProtos(Class<T> messageClass) {
+ return PubsubIO.<T>read().withCoder(ProtoCoder.of(messageClass));
+ }
+
+ /**
+ * Returns A {@link PTransform} that continuously reads binary encoded Avro messages of the
+ * given type from a Google Cloud Pub/Sub stream.
+ */
+ public static <T extends Message> Read<T> readAvros(Class<T> clazz) {
+ return PubsubIO.<T>read().withCoder(AvroCoder.of(clazz));
+ }
+
/** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. */
public static <T> Write<T> write() {
return new AutoValue_PubsubIO_Write.Builder<T>().build();
http://git-wip-us.apache.org/repos/asf/beam/blob/079353d5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index b16b665..6392fd2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -82,7 +82,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A PTransform which streams messages from Pubsub.
+ * Users should use {@link PubsubIO#read} instead.
+ *
+ * <p>A PTransform which streams messages from Pubsub.
* <ul>
* <li>The underlying implementation in an {@link UnboundedSource} which receives messages
* in batches and hands them out one at a time.
http://git-wip-us.apache.org/repos/asf/beam/blob/079353d5/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index f896bfc..20039d4 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import java.util.Set;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.UsesUnboundedPCollections;
import org.apache.beam.sdk.testing.ValidatesRunner;
@@ -146,7 +145,7 @@ public class PubsubIOTest {
public void testPrimitiveReadDisplayData() {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
Set<DisplayData> displayData;
- PubsubIO.Read<String> baseRead = PubsubIO.<String>read().withCoder(StringUtf8Coder.of());
+ PubsubIO.Read<String> baseRead = PubsubIO.readStrings();
// Reading from a subscription.
PubsubIO.Read<String> read =