You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/04 23:23:28 UTC

[3/4] beam git commit: [BEAM-2170] PubsubMessageWithAttributesCoder should not NPE on messages without attributes

[BEAM-2170] PubsubMessageWithAttributesCoder should not NPE on messages without attributes


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d9943a3c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d9943a3c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d9943a3c

Branch: refs/heads/master
Commit: d9943a3cbd402872053f5482cf08cb3b70416bd4
Parents: defb554
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu May 4 14:12:24 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu May 4 15:59:11 2017 -0700

----------------------------------------------------------------------
 .../PubsubMessageWithAttributesCoder.java       | 14 ++++----
 .../io/gcp/pubsub/PubsubUnboundedSinkTest.java  | 34 +++++++++++++++++---
 2 files changed, 35 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d9943a3c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
index f70955d..e061edc 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
@@ -31,10 +31,11 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 
 /** A coder for PubsubMessage including attributes. */
 public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubMessage> {
-  private static final Coder<byte[]> PAYLOAD_CODER =
-      NullableCoder.of(ByteArrayCoder.of());
-  private static final Coder<Map<String, String>> ATTRIBUTES_CODER = MapCoder.of(
-      StringUtf8Coder.of(), StringUtf8Coder.of());
+  // A message's payload can not be null
+  private static final Coder<byte[]> PAYLOAD_CODER = ByteArrayCoder.of();
+  // A message's attributes can be null.
+  private static final Coder<Map<String, String>> ATTRIBUTES_CODER =
+      NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
 
   public static Coder<PubsubMessage> of(TypeDescriptor<PubsubMessage> ignored) {
     return of();
@@ -46,10 +47,7 @@ public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubMessage>
 
   public void encode(PubsubMessage value, OutputStream outStream, Context context)
       throws IOException {
-    PAYLOAD_CODER.encode(
-        value.getPayload(),
-        outStream,
-        context.nested());
+    PAYLOAD_CODER.encode(value.getPayload(), outStream, context.nested());
     ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream, context);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d9943a3c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
index cc3c85e..e32e9a8 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
@@ -33,7 +33,6 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactor
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink.RecordIdMethod;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -42,7 +41,6 @@ import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -96,7 +94,6 @@ public class PubsubUnboundedSinkTest implements Serializable {
   }
 
   @Test
-  @Category(NeedsRunner.class)
   public void sendOneMessage() throws IOException {
     List<OutgoingMessage> outgoing =
         ImmutableList.of(new OutgoingMessage(
@@ -123,7 +120,35 @@ public class PubsubUnboundedSinkTest implements Serializable {
   }
 
   @Test
-  @Category(NeedsRunner.class)
+  public void sendOneMessageWithoutAttributes() throws IOException {
+    List<OutgoingMessage> outgoing =
+        ImmutableList.of(
+            new OutgoingMessage(
+                DATA.getBytes(), null /* attributes */, TIMESTAMP, getRecordId(DATA)));
+    try (PubsubTestClientFactory factory =
+        PubsubTestClient.createFactoryForPublish(
+            TOPIC, outgoing, ImmutableList.<OutgoingMessage>of())) {
+      PubsubUnboundedSink sink =
+          new PubsubUnboundedSink(
+              factory,
+              StaticValueProvider.of(TOPIC),
+              TIMESTAMP_ATTRIBUTE,
+              ID_ATTRIBUTE,
+              NUM_SHARDS,
+              1 /* batchSize */,
+              1 /* batchBytes */,
+              Duration.standardSeconds(2),
+              RecordIdMethod.DETERMINISTIC);
+      p.apply(Create.of(ImmutableList.of(DATA)))
+          .apply(ParDo.of(new Stamp(null /* attributes */)))
+          .apply(sink);
+      p.run();
+    }
+    // The PubsubTestClientFactory will assert fail on close if the actual published
+    // message does not match the expected publish message.
+  }
+
+  @Test
   public void sendMoreThanOneBatchByNumMessages() throws IOException {
     List<OutgoingMessage> outgoing = new ArrayList<>();
     List<String> data = new ArrayList<>();
@@ -152,7 +177,6 @@ public class PubsubUnboundedSinkTest implements Serializable {
   }
 
   @Test
-  @Category(NeedsRunner.class)
   public void sendMoreThanOneBatchByByteSize() throws IOException {
     List<OutgoingMessage> outgoing = new ArrayList<>();
     List<String> data = new ArrayList<>();