You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/04 23:23:27 UTC

[2/4] beam git commit: Makes PubsubMessagePayloadOnlyCoder not require whole-stream context

Makes PubsubMessagePayloadOnlyCoder not require whole-stream context

Now that PubsubIO.Read can directly read PubsubMessage's,
they should be treated as first-class PCollection elements, and
they can be encoded/decoded in any contexts.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c9b7fe44
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c9b7fe44
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c9b7fe44

Branch: refs/heads/master
Commit: c9b7fe443368badf6fd9fbd08f5234f17766c2cf
Parents: d9943a3
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu May 4 14:40:08 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu May 4 15:59:11 2017 -0700

----------------------------------------------------------------------
 .../io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java   | 13 ++++++-------
 1 file changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c9b7fe44/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
index 81c1a45..d120f72 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
@@ -17,17 +17,18 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsub;
 
-import static com.google.common.base.Preconditions.checkState;
-
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.util.StreamUtils;
 
 /** A coder for PubsubMessage treating the raw bytes being decoded as the message's payload. */
 public class PubsubMessagePayloadOnlyCoder extends CustomCoder<PubsubMessage> {
+  private static final Coder<byte[]> PAYLOAD_CODER = ByteArrayCoder.of();
+
   public static PubsubMessagePayloadOnlyCoder of() {
     return new PubsubMessagePayloadOnlyCoder();
   }
@@ -35,14 +36,12 @@ public class PubsubMessagePayloadOnlyCoder extends CustomCoder<PubsubMessage> {
   @Override
   public void encode(PubsubMessage value, OutputStream outStream, Context context)
       throws IOException {
-    checkState(context.isWholeStream, "Expected to only be used in a whole-stream context");
-    outStream.write(value.getPayload());
+    PAYLOAD_CODER.encode(value.getPayload(), outStream, context);
   }
 
   @Override
   public PubsubMessage decode(InputStream inStream, Context context) throws IOException {
-    checkState(context.isWholeStream, "Expected to only be used in a whole-stream context");
     return new PubsubMessage(
-        StreamUtils.getBytes(inStream), ImmutableMap.<String, String>of());
+        PAYLOAD_CODER.decode(inStream, context), ImmutableMap.<String, String>of());
   }
 }