You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/04 23:23:28 UTC
[3/4] beam git commit: [BEAM-2170] PubsubMessageWithAttributesCoder
should not NPE on messages without attributes
[BEAM-2170] PubsubMessageWithAttributesCoder should not NPE on messages without attributes
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d9943a3c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d9943a3c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d9943a3c
Branch: refs/heads/master
Commit: d9943a3cbd402872053f5482cf08cb3b70416bd4
Parents: defb554
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu May 4 14:12:24 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu May 4 15:59:11 2017 -0700
----------------------------------------------------------------------
.../PubsubMessageWithAttributesCoder.java | 14 ++++----
.../io/gcp/pubsub/PubsubUnboundedSinkTest.java | 34 +++++++++++++++++---
2 files changed, 35 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d9943a3c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
index f70955d..e061edc 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
@@ -31,10 +31,11 @@ import org.apache.beam.sdk.values.TypeDescriptor;
/** A coder for PubsubMessage including attributes. */
public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubMessage> {
- private static final Coder<byte[]> PAYLOAD_CODER =
- NullableCoder.of(ByteArrayCoder.of());
- private static final Coder<Map<String, String>> ATTRIBUTES_CODER = MapCoder.of(
- StringUtf8Coder.of(), StringUtf8Coder.of());
+ // A message's payload can not be null
+ private static final Coder<byte[]> PAYLOAD_CODER = ByteArrayCoder.of();
+ // A message's attributes can be null.
+ private static final Coder<Map<String, String>> ATTRIBUTES_CODER =
+ NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
public static Coder<PubsubMessage> of(TypeDescriptor<PubsubMessage> ignored) {
return of();
@@ -46,10 +47,7 @@ public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubMessage>
public void encode(PubsubMessage value, OutputStream outStream, Context context)
throws IOException {
- PAYLOAD_CODER.encode(
- value.getPayload(),
- outStream,
- context.nested());
+ PAYLOAD_CODER.encode(value.getPayload(), outStream, context.nested());
ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream, context);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d9943a3c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
index cc3c85e..e32e9a8 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
@@ -33,7 +33,6 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactor
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink.RecordIdMethod;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
@@ -42,7 +41,6 @@ import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
-import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -96,7 +94,6 @@ public class PubsubUnboundedSinkTest implements Serializable {
}
@Test
- @Category(NeedsRunner.class)
public void sendOneMessage() throws IOException {
List<OutgoingMessage> outgoing =
ImmutableList.of(new OutgoingMessage(
@@ -123,7 +120,35 @@ public class PubsubUnboundedSinkTest implements Serializable {
}
@Test
- @Category(NeedsRunner.class)
+ public void sendOneMessageWithoutAttributes() throws IOException {
+ List<OutgoingMessage> outgoing =
+ ImmutableList.of(
+ new OutgoingMessage(
+ DATA.getBytes(), null /* attributes */, TIMESTAMP, getRecordId(DATA)));
+ try (PubsubTestClientFactory factory =
+ PubsubTestClient.createFactoryForPublish(
+ TOPIC, outgoing, ImmutableList.<OutgoingMessage>of())) {
+ PubsubUnboundedSink sink =
+ new PubsubUnboundedSink(
+ factory,
+ StaticValueProvider.of(TOPIC),
+ TIMESTAMP_ATTRIBUTE,
+ ID_ATTRIBUTE,
+ NUM_SHARDS,
+ 1 /* batchSize */,
+ 1 /* batchBytes */,
+ Duration.standardSeconds(2),
+ RecordIdMethod.DETERMINISTIC);
+ p.apply(Create.of(ImmutableList.of(DATA)))
+ .apply(ParDo.of(new Stamp(null /* attributes */)))
+ .apply(sink);
+ p.run();
+ }
+ // The PubsubTestClientFactory will assert fail on close if the actual published
+ // message does not match the expected publish message.
+ }
+
+ @Test
public void sendMoreThanOneBatchByNumMessages() throws IOException {
List<OutgoingMessage> outgoing = new ArrayList<>();
List<String> data = new ArrayList<>();
@@ -152,7 +177,6 @@ public class PubsubUnboundedSinkTest implements Serializable {
}
@Test
- @Category(NeedsRunner.class)
public void sendMoreThanOneBatchByByteSize() throws IOException {
List<OutgoingMessage> outgoing = new ArrayList<>();
List<String> data = new ArrayList<>();