You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/12 16:57:05 UTC
[2/2] incubator-beam git commit: [BEAM-53] Add PubsubUnboundedSink
and tests
[BEAM-53] Add PubsubUnboundedSink and tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/040f8f98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/040f8f98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/040f8f98
Branch: refs/heads/master
Commit: 040f8f986bb22c8adbe29b1dbb8465bf38d9476f
Parents: 123674f
Author: Mark Shields <ma...@google.com>
Authored: Tue Apr 26 18:42:00 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu May 12 09:56:52 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/io/PubsubUnboundedSink.java | 392 +++++++++++++++++++
.../beam/sdk/util/PubsubApiaryClient.java | 14 +-
.../org/apache/beam/sdk/util/PubsubClient.java | 19 +-
.../apache/beam/sdk/util/PubsubGrpcClient.java | 59 +--
.../apache/beam/sdk/util/PubsubTestClient.java | 10 +
.../beam/sdk/io/PubsubUnboundedSinkTest.java | 145 +++++++
6 files changed, 612 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/040f8f98/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
new file mode 100644
index 0000000..6d08a70
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.PubsubClient;
+import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory;
+import org.apache.beam.sdk.util.PubsubClient.TopicPath;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import javax.annotation.Nullable;
+
+/**
+ * A PTransform which streams messages to Pubsub.
+ * <ul>
+ * <li>The underlying implementation is just a {@link GroupByKey} followed by a {@link ParDo} which
+ * publishes as a side effect. (In the future we want to design and switch to a custom
+ * {@code UnboundedSink} implementation so as to gain access to system watermark and
+ * end-of-pipeline cleanup.)
+ * <li>We try to send messages in batches while also limiting send latency.
+ * <li>No stats are logged. Rather some counters are used to keep track of elements and batches.
+ * <li>Though some background threads are used by the underlying netty system all actual Pubsub
+ * calls are blocking. We rely on the underlying runner to allow multiple {@link DoFn} instances
+ * to execute concurrently and hide latency.
+ * <li>A failed bundle will cause messages to be resent. Thus we rely on the Pubsub consumer
+ * to dedup messages.
+ * </ul>
+ */
+public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
+ private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSink.class);
+
+ /**
+ * Default maximum number of messages per publish.
+ */
+ private static final int DEFAULT_PUBLISH_BATCH_SIZE = 1000;
+
+ /**
+ * Default maximum size of a publish batch, in bytes.
+ */
+ private static final int DEFAULT_PUBLISH_BATCH_BYTES = 400000;
+
+ /**
+ * Default longest delay between receiving a message and pushing it to Pubsub.
+ */
+ private static final Duration DEFAULT_MAX_LATENCY = Duration.standardSeconds(2);
+
+ /**
+ * Coder for conveying outgoing messages between internal stages.
+ */
+ private static class OutgoingMessageCoder extends CustomCoder<OutgoingMessage> {
+ @Override
+ public void encode(
+ OutgoingMessage value, OutputStream outStream, Context context)
+ throws CoderException, IOException {
+ ByteArrayCoder.of().encode(value.elementBytes, outStream, Context.NESTED);
+ BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, Context.NESTED);
+ }
+
+ @Override
+ public OutgoingMessage decode(
+ InputStream inStream, Context context) throws CoderException, IOException {
+ byte[] elementBytes = ByteArrayCoder.of().decode(inStream, Context.NESTED);
+ long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, Context.NESTED);
+ return new OutgoingMessage(elementBytes, timestampMsSinceEpoch);
+ }
+ }
+
+ @VisibleForTesting
+ static final Coder<OutgoingMessage> CODER = new OutgoingMessageCoder();
+
+ // ================================================================================
+ // ShardFn
+ // ================================================================================
+
+ /**
+ * Convert elements to messages and shard them.
+ */
+ private static class ShardFn<T> extends DoFn<T, KV<Integer, OutgoingMessage>> {
+ private final Aggregator<Long, Long> elementCounter =
+ createAggregator("elements", new Sum.SumLongFn());
+ private final Coder<T> elementCoder;
+ private final int numShards;
+
+ ShardFn(Coder<T> elementCoder, int numShards) {
+ this.elementCoder = elementCoder;
+ this.numShards = numShards;
+ }
+
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ elementCounter.addValue(1L);
+ byte[] elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element());
+ long timestampMsSinceEpoch = c.timestamp().getMillis();
+ c.output(KV.of(ThreadLocalRandom.current().nextInt(numShards),
+ new OutgoingMessage(elementBytes, timestampMsSinceEpoch)));
+ }
+
+ @Override
+ public void populateDisplayData(Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("numShards", numShards));
+ }
+ }
+
+ // ================================================================================
+ // WriterFn
+ // ================================================================================
+
+ /**
+ * Publish messages to Pubsub in batches.
+ */
+ private static class WriterFn
+ extends DoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> {
+ private final PubsubClientFactory pubsubFactory;
+ private final TopicPath topic;
+ private final String timestampLabel;
+ private final String idLabel;
+ private final int publishBatchSize;
+ private final int publishBatchBytes;
+
+ /**
+ * Client on which to talk to Pubsub. Null until created by {@link #startBundle}.
+ */
+ @Nullable
+ private transient PubsubClient pubsubClient;
+
+ private final Aggregator<Long, Long> batchCounter =
+ createAggregator("batches", new Sum.SumLongFn());
+ private final Aggregator<Long, Long> elementCounter =
+ createAggregator("elements", new Sum.SumLongFn());
+ private final Aggregator<Long, Long> byteCounter =
+ createAggregator("bytes", new Sum.SumLongFn());
+
+ WriterFn(
+ PubsubClientFactory pubsubFactory, TopicPath topic, String timestampLabel,
+ String idLabel, int publishBatchSize, int publishBatchBytes) {
+ this.pubsubFactory = pubsubFactory;
+ this.topic = topic;
+ this.timestampLabel = timestampLabel;
+ this.idLabel = idLabel;
+ this.publishBatchSize = publishBatchSize;
+ this.publishBatchBytes = publishBatchBytes;
+ }
+
+ /**
+ * BLOCKING
+ * Send {@code messages} as a batch to Pubsub.
+ */
+ private void publishBatch(List<OutgoingMessage> messages, int bytes)
+ throws IOException {
+ long nowMsSinceEpoch = System.currentTimeMillis();
+ int n = pubsubClient.publish(topic, messages);
+ checkState(n == messages.size(), "Attempted to publish %d messages but %d were successful",
+ messages.size(), n);
+ batchCounter.addValue(1L);
+ elementCounter.addValue((long) messages.size());
+ byteCounter.addValue((long) bytes);
+ }
+
+ @Override
+ public void startBundle(Context c) throws Exception {
+ checkState(pubsubClient == null, "startBundle invoked without prior finishBundle");
+ pubsubClient = pubsubFactory.newClient(timestampLabel, idLabel,
+ c.getPipelineOptions().as(PubsubOptions.class));
+ }
+
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ List<OutgoingMessage> pubsubMessages = new ArrayList<>(publishBatchSize);
+ int bytes = 0;
+ for (OutgoingMessage message : c.element().getValue()) {
+ if (!pubsubMessages.isEmpty()
+ && bytes + message.elementBytes.length > publishBatchBytes) {
+ // Break large (in bytes) batches into smaller.
+ // (We've already broken by batch size using the trigger below, though that may
+ // run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider that ok since
+ // the hard limit from Pubsub is by bytes rather than number of messages.)
+ // BLOCKS until published.
+ publishBatch(pubsubMessages, bytes);
+ pubsubMessages.clear();
+ bytes = 0;
+ }
+ pubsubMessages.add(message);
+ bytes += message.elementBytes.length;
+ }
+ if (!pubsubMessages.isEmpty()) {
+ // BLOCKS until published.
+ publishBatch(pubsubMessages, bytes);
+ }
+ }
+
+ @Override
+ public void finishBundle(Context c) throws Exception {
+ pubsubClient.close();
+ pubsubClient = null;
+ }
+
+ @Override
+ public void populateDisplayData(Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("topic", topic.getPath()));
+ builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
+ builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel));
+ builder.addIfNotNull(DisplayData.item("idLabel", idLabel));
+ }
+ }
+
+ // ================================================================================
+ // PubsubUnboundedSink
+ // ================================================================================
+
+ /**
+ * Which factory to use for creating Pubsub transport.
+ */
+ private final PubsubClientFactory pubsubFactory;
+
+ /**
+ * Pubsub topic to publish to.
+ */
+ private final TopicPath topic;
+
+ /**
+ * Coder for elements. It is the responsibility of the underlying Pubsub transport to
+ * re-encode element bytes if necessary, eg as Base64 strings.
+ */
+ private final Coder<T> elementCoder;
+
+ /**
+ * Pubsub metadata field holding timestamp of each element, or {@literal null} if should use
+ * Pubsub message publish timestamp instead.
+ */
+ @Nullable
+ private final String timestampLabel;
+
+ /**
+ * Pubsub metadata field holding id for each element, or {@literal null} if need to generate
+ * a unique id ourselves.
+ */
+ @Nullable
+ private final String idLabel;
+
+ /**
+ * Number of 'shards' to use so that latency in Pubsub publish can be hidden. Generally this
+ * should be a small multiple of the number of available cores. Too smoll a number results
+ * in too much time lost to blocking Pubsub calls. To large a number results in too many
+ * single-element batches being sent to Pubsub with high per-batch overhead.
+ */
+ private final int numShards;
+
+ /**
+ * Maximum number of messages per publish.
+ */
+ private final int publishBatchSize;
+
+ /**
+ * Maximum size of a publish batch, in bytes.
+ */
+ private final int publishBatchBytes;
+
+ /**
+ * Longest delay between receiving a message and pushing it to Pubsub.
+ */
+ private final Duration maxLatency;
+
+ @VisibleForTesting
+ PubsubUnboundedSink(
+ PubsubClientFactory pubsubFactory,
+ TopicPath topic,
+ Coder<T> elementCoder,
+ String timestampLabel,
+ String idLabel,
+ int numShards,
+ int publishBatchSize,
+ int publishBatchBytes,
+ Duration maxLatency) {
+ this.pubsubFactory = pubsubFactory;
+ this.topic = topic;
+ this.elementCoder = elementCoder;
+ this.timestampLabel = timestampLabel;
+ this.idLabel = idLabel;
+ this.numShards = numShards;
+ this.publishBatchSize = publishBatchSize;
+ this.publishBatchBytes = publishBatchBytes;
+ this.maxLatency = maxLatency;
+ }
+
+ public PubsubUnboundedSink(
+ PubsubClientFactory pubsubFactory,
+ TopicPath topic,
+ Coder<T> elementCoder,
+ String timestampLabel,
+ String idLabel,
+ int numShards) {
+ this(pubsubFactory, topic, elementCoder, timestampLabel, idLabel, numShards,
+ DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY);
+ }
+
+ public TopicPath getTopic() {
+ return topic;
+ }
+
+ @Nullable
+ public String getTimestampLabel() {
+ return timestampLabel;
+ }
+
+ @Nullable
+ public String getIdLabel() {
+ return idLabel;
+ }
+
+ public Coder<T> getElementCoder() {
+ return elementCoder;
+ }
+
+ @Override
+ public PDone apply(PCollection<T> input) {
+ input.apply(
+ Window.named("PubsubUnboundedSink.Window")
+ .<T>into(new GlobalWindows())
+ .triggering(
+ Repeatedly.forever(
+ AfterFirst.of(AfterPane.elementCountAtLeast(publishBatchSize),
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(maxLatency))))
+ .discardingFiredPanes())
+ .apply(ParDo.named("PubsubUnboundedSink.Shard")
+ .of(new ShardFn<T>(elementCoder, numShards)))
+ .setCoder(KvCoder.of(VarIntCoder.of(), CODER))
+ .apply(GroupByKey.<Integer, OutgoingMessage>create())
+ .apply(ParDo.named("PubsubUnboundedSink.Writer")
+ .of(new WriterFn(pubsubFactory, topic, timestampLabel, idLabel,
+ publishBatchSize, publishBatchBytes)));
+ return PDone.in(input.getPipeline());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/040f8f98/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
index f0a9096..29d0fd5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
@@ -55,7 +55,7 @@ import javax.annotation.Nullable;
*/
public class PubsubApiaryClient extends PubsubClient {
- public static final PubsubClientFactory FACTORY = new PubsubClientFactory() {
+ private static class PubsubApiaryClientFactory implements PubsubClientFactory {
@Override
public PubsubClient newClient(
@Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
@@ -73,7 +73,17 @@ public class PubsubApiaryClient extends PubsubClient {
.build();
return new PubsubApiaryClient(timestampLabel, idLabel, pubsub);
}
- };
+
+ @Override
+ public String getKind() {
+ return "Apiary";
+ }
+ }
+
+ /**
+ * Factory for creating Pubsub clients using Apiary transport.
+ */
+ public static final PubsubClientFactory FACTORY = new PubsubApiaryClientFactory();
/**
* Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/040f8f98/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
index a44329d..9c75003 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
@@ -54,6 +54,11 @@ public abstract class PubsubClient implements Closeable {
@Nullable String timestampLabel,
@Nullable String idLabel,
PubsubOptions options) throws IOException;
+
+ /**
+ * Return the display name for this factory. Eg "Apiary", "gRPC".
+ */
+ String getKind();
}
/**
@@ -205,7 +210,7 @@ public abstract class PubsubClient implements Closeable {
}
public static SubscriptionPath subscriptionPathFromPath(String path) {
- return new SubscriptionPath(path);
+ return new SubscriptionPath(path);
}
public static SubscriptionPath subscriptionPathFromName(
@@ -287,6 +292,12 @@ public abstract class PubsubClient implements Closeable {
}
@Override
+ public String toString() {
+ return String.format("OutgoingMessage(%db, %dms)",
+ elementBytes.length, timestampMsSinceEpoch);
+ }
+
+ @Override
public boolean equals(Object o) {
if (this == o) {
return true;
@@ -361,6 +372,12 @@ public abstract class PubsubClient implements Closeable {
}
@Override
+ public String toString() {
+ return String.format("IncomingMessage(%db, %dms)",
+ elementBytes.length, timestampMsSinceEpoch);
+ }
+
+ @Override
public boolean equals(Object o) {
if (this == o) {
return true;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/040f8f98/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
index b3c1b8f..bb535aa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
@@ -75,36 +75,47 @@ import javax.annotation.Nullable;
public class PubsubGrpcClient extends PubsubClient {
private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com";
private static final int PUBSUB_PORT = 443;
+ // Will be needed when credentials are correctly constructed and scoped.
+ @SuppressWarnings("unused")
private static final List<String> PUBSUB_SCOPES =
Collections.singletonList("https://www.googleapis.com/auth/pubsub");
private static final int LIST_BATCH_SIZE = 1000;
private static final int DEFAULT_TIMEOUT_S = 15;
- public static final PubsubClientFactory FACTORY =
- new PubsubClientFactory() {
- @Override
- public PubsubClient newClient(
- @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
- throws IOException {
- ManagedChannel channel = NettyChannelBuilder
- .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT)
- .negotiationType(NegotiationType.TLS)
- .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
- .build();
- // TODO: GcpOptions needs to support building com.google.auth.oauth2.Credentials from the
- // various command line options. It currently only supports the older
- // com.google.api.client.auth.oauth2.Credentials.
- GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
- return new PubsubGrpcClient(timestampLabel,
- idLabel,
- DEFAULT_TIMEOUT_S,
- channel,
- credentials,
- null /* publisher stub */,
- null /* subscriber stub */);
- }
- };
+ private static class PubsubGrpcClientFactory implements PubsubClientFactory {
+ @Override
+ public PubsubClient newClient(
+ @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+ throws IOException {
+ ManagedChannel channel = NettyChannelBuilder
+ .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT)
+ .negotiationType(NegotiationType.TLS)
+ .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
+ .build();
+ // TODO: GcpOptions needs to support building com.google.auth.oauth2.Credentials from the
+ // various command line options. It currently only supports the older
+ // com.google.api.client.auth.oauth2.Credentials.
+ GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
+ return new PubsubGrpcClient(timestampLabel,
+ idLabel,
+ DEFAULT_TIMEOUT_S,
+ channel,
+ credentials,
+ null /* publisher stub */,
+ null /* subscriber stub */);
+ }
+
+ @Override
+ public String getKind() {
+ return "Grpc";
+ }
+ }
+
+ /**
+ * Factory for creating Pubsub clients using gRCP transport.
+ */
+ public static final PubsubClientFactory FACTORY = new PubsubGrpcClientFactory();
/**
* Timeout for grpc calls (in s).
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/040f8f98/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
index 4a47c30..9c3dd85 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
@@ -51,6 +51,11 @@ public class PubsubTestClient extends PubsubClient {
throws IOException {
return new PubsubTestClient(expectedTopic, null, 0, expectedOutgoingMessages, null);
}
+
+ @Override
+ public String getKind() {
+ return "PublishTest";
+ }
};
}
@@ -66,6 +71,11 @@ public class PubsubTestClient extends PubsubClient {
return new PubsubTestClient(null, expectedSubscription, ackTimeoutSec,
null, expectedIncomingMessages);
}
+
+ @Override
+ public String getKind() {
+ return "PullTest";
+ }
};
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/040f8f98/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
new file mode 100644
index 0000000..2cb9a65
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.PubsubClient;
+import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory;
+import org.apache.beam.sdk.util.PubsubClient.TopicPath;
+import org.apache.beam.sdk.util.PubsubTestClient;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Test PubsubUnboundedSink.
+ */
+@RunWith(JUnit4.class)
+public class PubsubUnboundedSinkTest {
+ private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
+ private static final String DATA = "testData";
+ private static final long TIMESTAMP = 1234L;
+ private static final String TIMESTAMP_LABEL = "timestamp";
+ private static final String ID_LABEL = "id";
+
+ private static class Stamp extends DoFn<String, String> {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.outputWithTimestamp(c.element(), new Instant(TIMESTAMP));
+ }
+ }
+
+ @Test
+ public void saneCoder() throws Exception {
+ OutgoingMessage message = new OutgoingMessage(DATA.getBytes(), TIMESTAMP);
+ CoderProperties.coderDecodeEncodeEqual(PubsubUnboundedSink.CODER, message);
+ CoderProperties.coderSerializable(PubsubUnboundedSink.CODER);
+ }
+
+ @Test
+ public void sendOneMessage() {
+ Set<OutgoingMessage> outgoing =
+ Sets.newHashSet(new OutgoingMessage(DATA.getBytes(), TIMESTAMP));
+ PubsubClientFactory factory =
+ PubsubTestClient.createFactoryForPublish(TOPIC, outgoing);
+ PubsubUnboundedSink<String> sink =
+ new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
+ 10);
+ TestPipeline p = TestPipeline.create();
+ p.apply(Create.of(ImmutableList.of(DATA)))
+ .apply(ParDo.of(new Stamp()))
+ .apply(sink);
+ // Run the pipeline. The PubsubTestClient will assert fail if the actual published
+ // message does not match the expected publish message.
+ p.run();
+ }
+
+ @Test
+ public void sendMoreThanOneBatchByNumMessages() {
+ Set<OutgoingMessage> outgoing = new HashSet<>();
+ List<String> data = new ArrayList<>();
+ int batchSize = 2;
+ int batchBytes = 1000;
+ for (int i = 0; i < batchSize * 10; i++) {
+ String str = String.valueOf(i);
+ outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP));
+ data.add(str);
+ }
+ PubsubClientFactory factory =
+ PubsubTestClient.createFactoryForPublish(TOPIC, outgoing);
+ PubsubUnboundedSink<String> sink =
+ new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
+ 10, batchSize, batchBytes, Duration.standardSeconds(2));
+ TestPipeline p = TestPipeline.create();
+ p.apply(Create.of(data))
+ .apply(ParDo.of(new Stamp()))
+ .apply(sink);
+ // Run the pipeline. The PubsubTestClient will assert fail if the actual published
+ // message does not match the expected publish message.
+ p.run();
+ }
+
+ @Test
+ public void sendMoreThanOneBatchByByteSize() {
+ Set<OutgoingMessage> outgoing = new HashSet<>();
+ List<String> data = new ArrayList<>();
+ int batchSize = 100;
+ int batchBytes = 10;
+ int n = 0;
+ while (n < batchBytes * 10) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < batchBytes; i++) {
+ sb.append(String.valueOf(n));
+ }
+ String str = sb.toString();
+ outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP));
+ data.add(str);
+ n += str.length();
+ }
+ PubsubClientFactory factory =
+ PubsubTestClient.createFactoryForPublish(TOPIC, outgoing);
+ PubsubUnboundedSink<String> sink =
+ new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
+ 10, batchSize, batchBytes, Duration.standardSeconds(2));
+ TestPipeline p = TestPipeline.create();
+ p.apply(Create.of(data))
+ .apply(ParDo.of(new Stamp()))
+ .apply(sink);
+ // Run the pipeline. The PubsubTestClient will assert fail if the actual published
+ // message does not match the expected publish message.
+ p.run();
+ }
+}