You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/29 22:31:58 UTC

[6/9] beam git commit: Adds PubsubIO.readStrings(), readProtos(), readAvros()

Adds PubsubIO.readStrings(), readProtos(), readAvros()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/079353d5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/079353d5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/079353d5

Branch: refs/heads/master
Commit: 079353d58c65141683e4640e425ee610001e7718
Parents: 42c975e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 20 17:50:43 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 13:15:48 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/complete/game/GameStats.java  |  6 ++---
 .../examples/complete/game/LeaderBoard.java     |  6 ++---
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 28 ++++++++++++++++++++
 .../io/gcp/pubsub/PubsubUnboundedSource.java    |  4 ++-
 .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java    |  3 +--
 5 files changed, 36 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/079353d5/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index d95eb06..d628497 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -24,7 +24,6 @@ import org.apache.beam.examples.common.ExampleUtils;
 import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
@@ -252,9 +251,8 @@ public class GameStats extends LeaderBoard {
 
     // Read Events from Pub/Sub using custom timestamps
     PCollection<GameActionInfo> rawEvents = pipeline
-        .apply(PubsubIO.<String>read()
-            .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic())
-            .withCoder(StringUtf8Coder.of()))
+        .apply(PubsubIO.readStrings()
+            .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
         .apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
 
     // Extract username/score pairs from the event stream

http://git-wip-us.apache.org/repos/asf/beam/blob/079353d5/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index a87468a..fbffac6 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -27,7 +27,6 @@ import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
 import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
@@ -191,9 +190,8 @@ public class LeaderBoard extends HourlyTeamScore {
     // Read game events from Pub/Sub using custom timestamps, which are extracted from the pubsub
     // data elements, and parse the data.
     PCollection<GameActionInfo> gameEvents = pipeline
-        .apply(PubsubIO.<String>read()
-            .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic())
-            .withCoder(StringUtf8Coder.of()))
+        .apply(PubsubIO.readStrings()
+            .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
         .apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
 
     gameEvents.apply("CalculateTeamScores",

http://git-wip-us.apache.org/repos/asf/beam/blob/079353d5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 99df3c6..9604864 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
+import com.google.protobuf.Message;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -29,8 +30,11 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
@@ -461,6 +465,30 @@ public class PubsubIO {
     return new AutoValue_PubsubIO_Read.Builder<T>().build();
   }
 
+  /**
+   * Returns A {@link PTransform} that continuously reads UTF-8 encoded strings from a Google Cloud
+   * Pub/Sub stream.
+   */
+  public static Read<String> readStrings() {
+    return PubsubIO.<String>read().withCoder(StringUtf8Coder.of());
+  }
+
+  /**
+   * Returns A {@link PTransform} that continuously reads binary encoded protos of the given type
+   * from a Google Cloud Pub/Sub stream.
+   */
+  public static <T extends Message> Read<T> readProtos(Class<T> messageClass) {
+    return PubsubIO.<T>read().withCoder(ProtoCoder.of(messageClass));
+  }
+
+  /**
+   * Returns A {@link PTransform} that continuously reads binary encoded Avro messages of the
+   * given type from a Google Cloud Pub/Sub stream.
+   */
+  public static <T extends Message> Read<T> readAvros(Class<T> clazz) {
+    return PubsubIO.<T>read().withCoder(AvroCoder.of(clazz));
+  }
+
   /** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. */
   public static <T> Write<T> write() {
     return new AutoValue_PubsubIO_Write.Builder<T>().build();

http://git-wip-us.apache.org/repos/asf/beam/blob/079353d5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index b16b665..6392fd2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -82,7 +82,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A PTransform which streams messages from Pubsub.
+ * Users should use {@link PubsubIO#read} instead.
+ *
+ * <p>A PTransform which streams messages from Pubsub.
  * <ul>
  * <li>The underlying implementation in an {@link UnboundedSource} which receives messages
  * in batches and hands them out one at a time.

http://git-wip-us.apache.org/repos/asf/beam/blob/079353d5/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index f896bfc..20039d4 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 
 import java.util.Set;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.UsesUnboundedPCollections;
 import org.apache.beam.sdk.testing.ValidatesRunner;
@@ -146,7 +145,7 @@ public class PubsubIOTest {
   public void testPrimitiveReadDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
     Set<DisplayData> displayData;
-    PubsubIO.Read<String> baseRead = PubsubIO.<String>read().withCoder(StringUtf8Coder.of());
+    PubsubIO.Read<String> baseRead = PubsubIO.readStrings();
 
     // Reading from a subscription.
     PubsubIO.Read<String> read =