You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/12 16:57:05 UTC

[2/2] incubator-beam git commit: [BEAM-53] Add PubsubUnboundedSink and tests

[BEAM-53] Add PubsubUnboundedSink and tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/040f8f98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/040f8f98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/040f8f98

Branch: refs/heads/master
Commit: 040f8f986bb22c8adbe29b1dbb8465bf38d9476f
Parents: 123674f
Author: Mark Shields <ma...@google.com>
Authored: Tue Apr 26 18:42:00 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu May 12 09:56:52 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/io/PubsubUnboundedSink.java | 392 +++++++++++++++++++
 .../beam/sdk/util/PubsubApiaryClient.java       |  14 +-
 .../org/apache/beam/sdk/util/PubsubClient.java  |  19 +-
 .../apache/beam/sdk/util/PubsubGrpcClient.java  |  59 +--
 .../apache/beam/sdk/util/PubsubTestClient.java  |  10 +
 .../beam/sdk/io/PubsubUnboundedSinkTest.java    | 145 +++++++
 6 files changed, 612 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/040f8f98/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
new file mode 100644
index 0000000..6d08a70
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.PubsubClient;
+import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory;
+import org.apache.beam.sdk.util.PubsubClient.TopicPath;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import javax.annotation.Nullable;
+
+/**
+ * A PTransform which streams messages to Pubsub.
+ * <ul>
+ * <li>The underlying implementation is just a {@link GroupByKey} followed by a {@link ParDo} which
+ * publishes as a side effect. (In the future we want to design and switch to a custom
+ * {@code UnboundedSink} implementation so as to gain access to system watermark and
+ * end-of-pipeline cleanup.)
+ * <li>We try to send messages in batches while also limiting send latency.
+ * <li>No stats are logged. Rather some counters are used to keep track of elements and batches.
+ * <li>Though some background threads are used by the underlying netty system all actual Pubsub
+ * calls are blocking. We rely on the underlying runner to allow multiple {@link DoFn} instances
+ * to execute concurrently and hide latency.
+ * <li>A failed bundle will cause messages to be resent. Thus we rely on the Pubsub consumer
+ * to dedup messages.
+ * </ul>
+ */
+public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
+  private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSink.class);
+
+  /**
+   * Default maximum number of messages per publish.
+   */
+  private static final int DEFAULT_PUBLISH_BATCH_SIZE = 1000;
+
+  /**
+   * Default maximum size of a publish batch, in bytes.
+   */
+  private static final int DEFAULT_PUBLISH_BATCH_BYTES = 400000;
+
+  /**
+   * Default longest delay between receiving a message and pushing it to Pubsub.
+   */
+  private static final Duration DEFAULT_MAX_LATENCY = Duration.standardSeconds(2);
+
+  /**
+   * Coder for conveying outgoing messages between internal stages.
+   */
+  private static class OutgoingMessageCoder extends CustomCoder<OutgoingMessage> {
+    @Override
+    public void encode(
+        OutgoingMessage value, OutputStream outStream, Context context)
+        throws CoderException, IOException {
+      ByteArrayCoder.of().encode(value.elementBytes, outStream, Context.NESTED);
+      BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, Context.NESTED);
+    }
+
+    @Override
+    public OutgoingMessage decode(
+        InputStream inStream, Context context) throws CoderException, IOException {
+      byte[] elementBytes = ByteArrayCoder.of().decode(inStream, Context.NESTED);
+      long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, Context.NESTED);
+      return new OutgoingMessage(elementBytes, timestampMsSinceEpoch);
+    }
+  }
+
+  @VisibleForTesting
+  static final Coder<OutgoingMessage> CODER = new OutgoingMessageCoder();
+
+  // ================================================================================
+  // ShardFn
+  // ================================================================================
+
+  /**
+   * Convert elements to messages and shard them.
+   */
+  private static class ShardFn<T> extends DoFn<T, KV<Integer, OutgoingMessage>> {
+    private final Aggregator<Long, Long> elementCounter =
+        createAggregator("elements", new Sum.SumLongFn());
+    private final Coder<T> elementCoder;
+    private final int numShards;
+
+    ShardFn(Coder<T> elementCoder, int numShards) {
+      this.elementCoder = elementCoder;
+      this.numShards = numShards;
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      elementCounter.addValue(1L);
+      byte[] elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element());
+      long timestampMsSinceEpoch = c.timestamp().getMillis();
+      c.output(KV.of(ThreadLocalRandom.current().nextInt(numShards),
+                     new OutgoingMessage(elementBytes, timestampMsSinceEpoch)));
+    }
+
+    @Override
+    public void populateDisplayData(Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("numShards", numShards));
+    }
+  }
+
+  // ================================================================================
+  // WriterFn
+  // ================================================================================
+
+  /**
+   * Publish messages to Pubsub in batches.
+   */
+  private static class WriterFn
+      extends DoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> {
+    private final PubsubClientFactory pubsubFactory;
+    private final TopicPath topic;
+    private final String timestampLabel;
+    private final String idLabel;
+    private final int publishBatchSize;
+    private final int publishBatchBytes;
+
+    /**
+     * Client on which to talk to Pubsub. Null until created by {@link #startBundle}.
+     */
+    @Nullable
+    private transient PubsubClient pubsubClient;
+
+    private final Aggregator<Long, Long> batchCounter =
+        createAggregator("batches", new Sum.SumLongFn());
+    private final Aggregator<Long, Long> elementCounter =
+        createAggregator("elements", new Sum.SumLongFn());
+    private final Aggregator<Long, Long> byteCounter =
+        createAggregator("bytes", new Sum.SumLongFn());
+
+    WriterFn(
+        PubsubClientFactory pubsubFactory, TopicPath topic, String timestampLabel,
+        String idLabel, int publishBatchSize, int publishBatchBytes) {
+      this.pubsubFactory = pubsubFactory;
+      this.topic = topic;
+      this.timestampLabel = timestampLabel;
+      this.idLabel = idLabel;
+      this.publishBatchSize = publishBatchSize;
+      this.publishBatchBytes = publishBatchBytes;
+    }
+
+    /**
+     * BLOCKING
+     * Send {@code messages} as a batch to Pubsub.
+     */
+    private void publishBatch(List<OutgoingMessage> messages, int bytes)
+        throws IOException {
+      long nowMsSinceEpoch = System.currentTimeMillis();
+      int n = pubsubClient.publish(topic, messages);
+      checkState(n == messages.size(), "Attempted to publish %d messages but %d were successful",
+                 messages.size(), n);
+      batchCounter.addValue(1L);
+      elementCounter.addValue((long) messages.size());
+      byteCounter.addValue((long) bytes);
+    }
+
+    @Override
+    public void startBundle(Context c) throws Exception {
+      checkState(pubsubClient == null, "startBundle invoked without prior finishBundle");
+      pubsubClient = pubsubFactory.newClient(timestampLabel, idLabel,
+                                             c.getPipelineOptions().as(PubsubOptions.class));
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      List<OutgoingMessage> pubsubMessages = new ArrayList<>(publishBatchSize);
+      int bytes = 0;
+      for (OutgoingMessage message : c.element().getValue()) {
+        if (!pubsubMessages.isEmpty()
+            && bytes + message.elementBytes.length > publishBatchBytes) {
+          // Break large (in bytes) batches into smaller.
+          // (We've already broken by batch size using the trigger below, though that may
+          // run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider that ok since
+          // the hard limit from Pubsub is by bytes rather than number of messages.)
+          // BLOCKS until published.
+          publishBatch(pubsubMessages, bytes);
+          pubsubMessages.clear();
+          bytes = 0;
+        }
+        pubsubMessages.add(message);
+        bytes += message.elementBytes.length;
+      }
+      if (!pubsubMessages.isEmpty()) {
+        // BLOCKS until published.
+        publishBatch(pubsubMessages, bytes);
+      }
+    }
+
+    @Override
+    public void finishBundle(Context c) throws Exception {
+      pubsubClient.close();
+      pubsubClient = null;
+    }
+
+    @Override
+    public void populateDisplayData(Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("topic", topic.getPath()));
+      builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
+      builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel));
+      builder.addIfNotNull(DisplayData.item("idLabel", idLabel));
+    }
+  }
+
+  // ================================================================================
+  // PubsubUnboundedSink
+  // ================================================================================
+
+  /**
+   * Which factory to use for creating Pubsub transport.
+   */
+  private final PubsubClientFactory pubsubFactory;
+
+  /**
+   * Pubsub topic to publish to.
+   */
+  private final TopicPath topic;
+
+  /**
+   * Coder for elements. It is the responsibility of the underlying Pubsub transport to
+   * re-encode element bytes if necessary, eg as Base64 strings.
+   */
+  private final Coder<T> elementCoder;
+
+  /**
+   * Pubsub metadata field holding timestamp of each element, or {@literal null} if should use
+   * Pubsub message publish timestamp instead.
+   */
+  @Nullable
+  private final String timestampLabel;
+
+  /**
+   * Pubsub metadata field holding id for each element, or {@literal null} if need to generate
+   * a unique id ourselves.
+   */
+  @Nullable
+  private final String idLabel;
+
+  /**
+   * Number of 'shards' to use so that latency in Pubsub publish can be hidden. Generally this
+   * should be a small multiple of the number of available cores. Too smoll a number results
+   * in too much time lost to blocking Pubsub calls. To large a number results in too many
+   * single-element batches being sent to Pubsub with high per-batch overhead.
+   */
+  private final int numShards;
+
+  /**
+   * Maximum number of messages per publish.
+   */
+  private final int publishBatchSize;
+
+  /**
+   * Maximum size of a publish batch, in bytes.
+   */
+  private final int publishBatchBytes;
+
+  /**
+   * Longest delay between receiving a message and pushing it to Pubsub.
+   */
+  private final Duration maxLatency;
+
+  @VisibleForTesting
+  PubsubUnboundedSink(
+      PubsubClientFactory pubsubFactory,
+      TopicPath topic,
+      Coder<T> elementCoder,
+      String timestampLabel,
+      String idLabel,
+      int numShards,
+      int publishBatchSize,
+      int publishBatchBytes,
+      Duration maxLatency) {
+    this.pubsubFactory = pubsubFactory;
+    this.topic = topic;
+    this.elementCoder = elementCoder;
+    this.timestampLabel = timestampLabel;
+    this.idLabel = idLabel;
+    this.numShards = numShards;
+    this.publishBatchSize = publishBatchSize;
+    this.publishBatchBytes = publishBatchBytes;
+    this.maxLatency = maxLatency;
+  }
+
+  public PubsubUnboundedSink(
+      PubsubClientFactory pubsubFactory,
+      TopicPath topic,
+      Coder<T> elementCoder,
+      String timestampLabel,
+      String idLabel,
+      int numShards) {
+    this(pubsubFactory, topic, elementCoder, timestampLabel, idLabel, numShards,
+         DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY);
+  }
+
+  public TopicPath getTopic() {
+    return topic;
+  }
+
+  @Nullable
+  public String getTimestampLabel() {
+    return timestampLabel;
+  }
+
+  @Nullable
+  public String getIdLabel() {
+    return idLabel;
+  }
+
+  public Coder<T> getElementCoder() {
+    return elementCoder;
+  }
+
+  @Override
+  public PDone apply(PCollection<T> input) {
+    input.apply(
+        Window.named("PubsubUnboundedSink.Window")
+            .<T>into(new GlobalWindows())
+            .triggering(
+                Repeatedly.forever(
+                    AfterFirst.of(AfterPane.elementCountAtLeast(publishBatchSize),
+                                  AfterProcessingTime.pastFirstElementInPane()
+                                                     .plusDelayOf(maxLatency))))
+            .discardingFiredPanes())
+         .apply(ParDo.named("PubsubUnboundedSink.Shard")
+                     .of(new ShardFn<T>(elementCoder, numShards)))
+         .setCoder(KvCoder.of(VarIntCoder.of(), CODER))
+         .apply(GroupByKey.<Integer, OutgoingMessage>create())
+         .apply(ParDo.named("PubsubUnboundedSink.Writer")
+                     .of(new WriterFn(pubsubFactory, topic, timestampLabel, idLabel,
+                                      publishBatchSize, publishBatchBytes)));
+    return PDone.in(input.getPipeline());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/040f8f98/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
index f0a9096..29d0fd5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
@@ -55,7 +55,7 @@ import javax.annotation.Nullable;
  */
 public class PubsubApiaryClient extends PubsubClient {
 
-  public static final PubsubClientFactory FACTORY = new PubsubClientFactory() {
+  private static class PubsubApiaryClientFactory implements PubsubClientFactory {
     @Override
     public PubsubClient newClient(
         @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
@@ -73,7 +73,17 @@ public class PubsubApiaryClient extends PubsubClient {
           .build();
       return new PubsubApiaryClient(timestampLabel, idLabel, pubsub);
     }
-  };
+
+    @Override
+    public String getKind() {
+      return "Apiary";
+    }
+  }
+
+  /**
+   * Factory for creating Pubsub clients using Apiary transport.
+   */
+  public static final PubsubClientFactory FACTORY = new PubsubApiaryClientFactory();
 
   /**
    * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/040f8f98/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
index a44329d..9c75003 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
@@ -54,6 +54,11 @@ public abstract class PubsubClient implements Closeable {
         @Nullable String timestampLabel,
         @Nullable String idLabel,
         PubsubOptions options) throws IOException;
+
+    /**
+     * Return the display name for this factory. Eg "Apiary", "gRPC".
+     */
+    String getKind();
   }
 
   /**
@@ -205,7 +210,7 @@ public abstract class PubsubClient implements Closeable {
   }
 
   public static SubscriptionPath subscriptionPathFromPath(String path) {
-      return new SubscriptionPath(path);
+    return new SubscriptionPath(path);
   }
 
   public static SubscriptionPath subscriptionPathFromName(
@@ -287,6 +292,12 @@ public abstract class PubsubClient implements Closeable {
     }
 
     @Override
+    public String toString() {
+      return String.format("OutgoingMessage(%db, %dms)",
+                           elementBytes.length, timestampMsSinceEpoch);
+    }
+
+    @Override
     public boolean equals(Object o) {
       if (this == o) {
         return true;
@@ -361,6 +372,12 @@ public abstract class PubsubClient implements Closeable {
     }
 
     @Override
+    public String toString() {
+      return String.format("IncomingMessage(%db, %dms)",
+                           elementBytes.length, timestampMsSinceEpoch);
+    }
+
+    @Override
     public boolean equals(Object o) {
       if (this == o) {
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/040f8f98/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
index b3c1b8f..bb535aa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
@@ -75,36 +75,47 @@ import javax.annotation.Nullable;
 public class PubsubGrpcClient extends PubsubClient {
   private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com";
   private static final int PUBSUB_PORT = 443;
+  // Will be needed when credentials are correctly constructed and scoped.
+  @SuppressWarnings("unused")
   private static final List<String> PUBSUB_SCOPES =
       Collections.singletonList("https://www.googleapis.com/auth/pubsub");
   private static final int LIST_BATCH_SIZE = 1000;
 
   private static final int DEFAULT_TIMEOUT_S = 15;
 
-  public static final PubsubClientFactory FACTORY =
-      new PubsubClientFactory() {
-        @Override
-        public PubsubClient newClient(
-            @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
-            throws IOException {
-          ManagedChannel channel = NettyChannelBuilder
-              .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT)
-              .negotiationType(NegotiationType.TLS)
-              .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
-              .build();
-          // TODO: GcpOptions needs to support building com.google.auth.oauth2.Credentials from the
-          // various command line options. It currently only supports the older
-          // com.google.api.client.auth.oauth2.Credentials.
-          GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
-          return new PubsubGrpcClient(timestampLabel,
-                                      idLabel,
-                                      DEFAULT_TIMEOUT_S,
-                                      channel,
-                                      credentials,
-                                      null /* publisher stub */,
-                                      null /* subscriber stub */);
-        }
-      };
+  private static class PubsubGrpcClientFactory implements PubsubClientFactory {
+    @Override
+    public PubsubClient newClient(
+        @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+        throws IOException {
+      ManagedChannel channel = NettyChannelBuilder
+          .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT)
+          .negotiationType(NegotiationType.TLS)
+          .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
+          .build();
+      // TODO: GcpOptions needs to support building com.google.auth.oauth2.Credentials from the
+      // various command line options. It currently only supports the older
+      // com.google.api.client.auth.oauth2.Credentials.
+      GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
+      return new PubsubGrpcClient(timestampLabel,
+                                  idLabel,
+                                  DEFAULT_TIMEOUT_S,
+                                  channel,
+                                  credentials,
+                                  null /* publisher stub */,
+                                  null /* subscriber stub */);
+    }
+
+    @Override
+    public String getKind() {
+      return "Grpc";
+    }
+  }
+
+  /**
+   * Factory for creating Pubsub clients using gRCP transport.
+   */
+  public static final PubsubClientFactory FACTORY = new PubsubGrpcClientFactory();
 
   /**
    * Timeout for grpc calls (in s).

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/040f8f98/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
index 4a47c30..9c3dd85 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
@@ -51,6 +51,11 @@ public class PubsubTestClient extends PubsubClient {
           throws IOException {
         return new PubsubTestClient(expectedTopic, null, 0, expectedOutgoingMessages, null);
       }
+
+      @Override
+      public String getKind() {
+        return "PublishTest";
+      }
     };
   }
 
@@ -66,6 +71,11 @@ public class PubsubTestClient extends PubsubClient {
         return new PubsubTestClient(null, expectedSubscription, ackTimeoutSec,
                                     null, expectedIncomingMessages);
       }
+
+      @Override
+      public String getKind() {
+        return "PullTest";
+      }
     };
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/040f8f98/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
new file mode 100644
index 0000000..2cb9a65
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.PubsubClient;
+import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory;
+import org.apache.beam.sdk.util.PubsubClient.TopicPath;
+import org.apache.beam.sdk.util.PubsubTestClient;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Test PubsubUnboundedSink.
+ */
+@RunWith(JUnit4.class)
+public class PubsubUnboundedSinkTest {
+  private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
+  private static final String DATA = "testData";
+  private static final long TIMESTAMP = 1234L;
+  private static final String TIMESTAMP_LABEL = "timestamp";
+  private static final String ID_LABEL = "id";
+
+  private static class Stamp extends DoFn<String, String> {
+    @Override
+    public void processElement(ProcessContext c) {
+      c.outputWithTimestamp(c.element(), new Instant(TIMESTAMP));
+    }
+  }
+
+  @Test
+  public void saneCoder() throws Exception {
+    OutgoingMessage message = new OutgoingMessage(DATA.getBytes(), TIMESTAMP);
+    CoderProperties.coderDecodeEncodeEqual(PubsubUnboundedSink.CODER, message);
+    CoderProperties.coderSerializable(PubsubUnboundedSink.CODER);
+  }
+
+  @Test
+  public void sendOneMessage() {
+    Set<OutgoingMessage> outgoing =
+        Sets.newHashSet(new OutgoingMessage(DATA.getBytes(), TIMESTAMP));
+    PubsubClientFactory factory =
+        PubsubTestClient.createFactoryForPublish(TOPIC, outgoing);
+    PubsubUnboundedSink<String> sink =
+        new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
+                                  10);
+    TestPipeline p = TestPipeline.create();
+    p.apply(Create.of(ImmutableList.of(DATA)))
+     .apply(ParDo.of(new Stamp()))
+     .apply(sink);
+    // Run the pipeline. The PubsubTestClient will assert fail if the actual published
+    // message does not match the expected publish message.
+    p.run();
+  }
+
+  @Test
+  public void sendMoreThanOneBatchByNumMessages() {
+    Set<OutgoingMessage> outgoing = new HashSet<>();
+    List<String> data = new ArrayList<>();
+    int batchSize = 2;
+    int batchBytes = 1000;
+    for (int i = 0; i < batchSize * 10; i++) {
+      String str = String.valueOf(i);
+      outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP));
+      data.add(str);
+    }
+    PubsubClientFactory factory =
+        PubsubTestClient.createFactoryForPublish(TOPIC, outgoing);
+    PubsubUnboundedSink<String> sink =
+        new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
+                                  10, batchSize, batchBytes, Duration.standardSeconds(2));
+    TestPipeline p = TestPipeline.create();
+    p.apply(Create.of(data))
+     .apply(ParDo.of(new Stamp()))
+     .apply(sink);
+    // Run the pipeline. The PubsubTestClient will assert fail if the actual published
+    // message does not match the expected publish message.
+    p.run();
+  }
+
+  @Test
+  public void sendMoreThanOneBatchByByteSize() {
+    Set<OutgoingMessage> outgoing = new HashSet<>();
+    List<String> data = new ArrayList<>();
+    int batchSize = 100;
+    int batchBytes = 10;
+    int n = 0;
+    while (n < batchBytes * 10) {
+      StringBuilder sb = new StringBuilder();
+      for (int i = 0; i < batchBytes; i++) {
+        sb.append(String.valueOf(n));
+      }
+      String str = sb.toString();
+      outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP));
+      data.add(str);
+      n += str.length();
+    }
+    PubsubClientFactory factory =
+        PubsubTestClient.createFactoryForPublish(TOPIC, outgoing);
+    PubsubUnboundedSink<String> sink =
+        new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
+                                  10, batchSize, batchBytes, Duration.standardSeconds(2));
+    TestPipeline p = TestPipeline.create();
+    p.apply(Create.of(data))
+     .apply(ParDo.of(new Stamp()))
+     .apply(sink);
+    // Run the pipeline. The PubsubTestClient will assert fail if the actual published
+    // message does not match the expected publish message.
+    p.run();
+  }
+}