You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/04 23:23:27 UTC
[2/4] beam git commit: Makes PubsubMessagePayloadOnlyCoder not
require whole-stream context
Makes PubsubMessagePayloadOnlyCoder not require whole-stream context
Now that PubsubIO.Read can directly read PubsubMessage's,
they should be treated as first-class PCollection elements, and
they can be encoded/decoded in any contexts.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c9b7fe44
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c9b7fe44
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c9b7fe44
Branch: refs/heads/master
Commit: c9b7fe443368badf6fd9fbd08f5234f17766c2cf
Parents: d9943a3
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu May 4 14:40:08 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu May 4 15:59:11 2017 -0700
----------------------------------------------------------------------
.../io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java | 13 ++++++-------
1 file changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c9b7fe44/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
index 81c1a45..d120f72 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
@@ -17,17 +17,18 @@
*/
package org.apache.beam.sdk.io.gcp.pubsub;
-import static com.google.common.base.Preconditions.checkState;
-
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.util.StreamUtils;
/** A coder for PubsubMessage treating the raw bytes being decoded as the message's payload. */
public class PubsubMessagePayloadOnlyCoder extends CustomCoder<PubsubMessage> {
+ private static final Coder<byte[]> PAYLOAD_CODER = ByteArrayCoder.of();
+
public static PubsubMessagePayloadOnlyCoder of() {
return new PubsubMessagePayloadOnlyCoder();
}
@@ -35,14 +36,12 @@ public class PubsubMessagePayloadOnlyCoder extends CustomCoder<PubsubMessage> {
@Override
public void encode(PubsubMessage value, OutputStream outStream, Context context)
throws IOException {
- checkState(context.isWholeStream, "Expected to only be used in a whole-stream context");
- outStream.write(value.getPayload());
+ PAYLOAD_CODER.encode(value.getPayload(), outStream, context);
}
@Override
public PubsubMessage decode(InputStream inStream, Context context) throws IOException {
- checkState(context.isWholeStream, "Expected to only be used in a whole-stream context");
return new PubsubMessage(
- StreamUtils.getBytes(inStream), ImmutableMap.<String, String>of());
+ PAYLOAD_CODER.decode(inStream, context), ImmutableMap.<String, String>of());
}
}