You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/20 01:25:06 UTC

incubator-beam git commit: PubsubIO: integrate the new PubsubUnboundedSource and Sink

Repository: incubator-beam
Updated Branches:
  refs/heads/master 662e49351 -> 26941f152


PubsubIO: integrate the new PubsubUnboundedSource and Sink


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/26941f15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/26941f15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/26941f15

Branch: refs/heads/master
Commit: 26941f152cb5bed422ff14ccb10403604a611130
Parents: 662e493
Author: Mark Shields <ma...@google.com>
Authored: Mon Apr 11 17:36:27 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu May 19 18:24:56 2016 -0700

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineRunner.java        | 249 ++++++++++++++++---
 .../dataflow/DataflowPipelineTranslator.java    |   8 -
 .../dataflow/internal/PubsubIOTranslator.java   | 108 --------
 .../dataflow/io/DataflowPubsubIOTest.java       |  13 +-
 .../java/org/apache/beam/sdk/io/PubsubIO.java   | 102 +++++---
 .../apache/beam/sdk/io/PubsubUnboundedSink.java |  67 ++++-
 .../beam/sdk/io/PubsubUnboundedSource.java      | 131 ++++++++--
 .../beam/sdk/util/PubsubApiaryClient.java       |  20 +-
 .../org/apache/beam/sdk/util/PubsubClient.java  |  82 ++++--
 .../apache/beam/sdk/util/PubsubGrpcClient.java  |  19 +-
 .../apache/beam/sdk/util/PubsubTestClient.java  |  21 +-
 .../beam/sdk/io/PubsubUnboundedSinkTest.java    |  48 ++--
 .../beam/sdk/io/PubsubUnboundedSourceTest.java  |   8 +-
 .../beam/sdk/util/PubsubApiaryClientTest.java   |  12 +-
 .../beam/sdk/util/PubsubGrpcClientTest.java     |  12 +-
 .../beam/sdk/util/PubsubTestClientTest.java     |  12 +-
 16 files changed, 604 insertions(+), 308 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
index 8801896..0c77191 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
@@ -33,7 +33,6 @@ import org.apache.beam.runners.dataflow.internal.IsmFormat;
 import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
 import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
 import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder;
-import org.apache.beam.runners.dataflow.internal.PubsubIOTranslator;
 import org.apache.beam.runners.dataflow.internal.ReadTranslator;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
@@ -63,6 +62,8 @@ import org.apache.beam.sdk.io.AvroIO;
 import org.apache.beam.sdk.io.BigQueryIO;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.PubsubIO;
+import org.apache.beam.sdk.io.PubsubUnboundedSink;
+import org.apache.beam.sdk.io.PubsubUnboundedSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.ShardNameTemplate;
 import org.apache.beam.sdk.io.TextIO;
@@ -107,6 +108,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -177,6 +179,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import javax.annotation.Nullable;
 
 /**
  * A {@link PipelineRunner} that executes the operations in the
@@ -338,33 +341,46 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
     this.pcollectionsRequiringIndexedFormat = new HashSet<>();
     this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>();
 
+    ImmutableMap.Builder<Class<?>, Class<?>> builder = ImmutableMap.<Class<?>, Class<?>>builder();
     if (options.isStreaming()) {
-      overrides = ImmutableMap.<Class<?>, Class<?>>builder()
-          .put(Combine.GloballyAsSingletonView.class, StreamingCombineGloballyAsSingletonView.class)
-          .put(Create.Values.class, StreamingCreate.class)
-          .put(View.AsMap.class, StreamingViewAsMap.class)
-          .put(View.AsMultimap.class, StreamingViewAsMultimap.class)
-          .put(View.AsSingleton.class, StreamingViewAsSingleton.class)
-          .put(View.AsList.class, StreamingViewAsList.class)
-          .put(View.AsIterable.class, StreamingViewAsIterable.class)
-          .put(Write.Bound.class, StreamingWrite.class)
-          .put(PubsubIO.Write.Bound.class, StreamingPubsubIOWrite.class)
-          .put(Read.Unbounded.class, StreamingUnboundedRead.class)
-          .put(Read.Bounded.class, UnsupportedIO.class)
-          .put(AvroIO.Read.Bound.class, UnsupportedIO.class)
-          .put(AvroIO.Write.Bound.class, UnsupportedIO.class)
-          .put(BigQueryIO.Read.Bound.class, UnsupportedIO.class)
-          .put(TextIO.Read.Bound.class, UnsupportedIO.class)
-          .put(TextIO.Write.Bound.class, UnsupportedIO.class)
-          .put(Window.Bound.class, AssignWindows.class)
-          .build();
+      builder.put(Combine.GloballyAsSingletonView.class,
+                  StreamingCombineGloballyAsSingletonView.class);
+      builder.put(Create.Values.class, StreamingCreate.class);
+      builder.put(View.AsMap.class, StreamingViewAsMap.class);
+      builder.put(View.AsMultimap.class, StreamingViewAsMultimap.class);
+      builder.put(View.AsSingleton.class, StreamingViewAsSingleton.class);
+      builder.put(View.AsList.class, StreamingViewAsList.class);
+      builder.put(View.AsIterable.class, StreamingViewAsIterable.class);
+      builder.put(Write.Bound.class, StreamingWrite.class);
+      builder.put(Read.Unbounded.class, StreamingUnboundedRead.class);
+      builder.put(Read.Bounded.class, UnsupportedIO.class);
+      builder.put(AvroIO.Read.Bound.class, UnsupportedIO.class);
+      builder.put(AvroIO.Write.Bound.class, UnsupportedIO.class);
+      builder.put(BigQueryIO.Read.Bound.class, UnsupportedIO.class);
+      builder.put(TextIO.Read.Bound.class, UnsupportedIO.class);
+      builder.put(TextIO.Write.Bound.class, UnsupportedIO.class);
+      builder.put(Window.Bound.class, AssignWindows.class);
+      // In streaming mode must use either the custom Pubsub unbounded source/sink or
+      // defer to Windmill's built-in implementation.
+      builder.put(PubsubIO.Read.Bound.PubsubBoundedReader.class, UnsupportedIO.class);
+      builder.put(PubsubIO.Write.Bound.PubsubBoundedWriter.class, UnsupportedIO.class);
+      if (options.getExperiments() == null
+          || !options.getExperiments().contains("enable_custom_pubsub_source")) {
+        builder.put(PubsubUnboundedSource.class, StreamingPubsubIORead.class);
+      }
+      if (options.getExperiments() == null
+          || !options.getExperiments().contains("enable_custom_pubsub_sink")) {
+        builder.put(PubsubUnboundedSink.class, StreamingPubsubIOWrite.class);
+      }
     } else {
-      ImmutableMap.Builder<Class<?>, Class<?>> builder = ImmutableMap.<Class<?>, Class<?>>builder();
       builder.put(Read.Unbounded.class, UnsupportedIO.class);
       builder.put(Window.Bound.class, AssignWindows.class);
       builder.put(Write.Bound.class, BatchWrite.class);
       builder.put(AvroIO.Write.Bound.class, BatchAvroIOWrite.class);
       builder.put(TextIO.Write.Bound.class, BatchTextIOWrite.class);
+      // In batch mode must use the custom Pubsub bounded source/sink.
+      builder.put(PubsubUnboundedSource.class, UnsupportedIO.class);
+      builder.put(PubsubUnboundedSink.class, UnsupportedIO.class);
       if (options.getExperiments() == null
           || !options.getExperiments().contains("disable_ism_side_input")) {
         builder.put(View.AsMap.class, BatchViewAsMap.class);
@@ -373,8 +389,8 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
         builder.put(View.AsList.class, BatchViewAsList.class);
         builder.put(View.AsIterable.class, BatchViewAsIterable.class);
       }
-      overrides = builder.build();
     }
+    overrides = builder.build();
   }
 
   /**
@@ -2336,27 +2352,104 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
     }
   }
 
+  // ================================================================================
+  // PubsubIO translations
+  // ================================================================================
+
   /**
-   * Specialized implementation for
-   * {@link org.apache.beam.sdk.io.PubsubIO.Write PubsubIO.Write} for the
-   * Dataflow runner in streaming mode.
-   *
-   * <p>For internal use only. Subject to change at any time.
-   *
-   * <p>Public so the {@link PubsubIOTranslator} can access.
+   * Suppress application of {@link PubsubUnboundedSource#apply} in streaming mode so that we
+   * can instead defer to Windmill's implementation.
    */
-  public static class StreamingPubsubIOWrite<T> extends PTransform<PCollection<T>, PDone> {
-    private final PubsubIO.Write.Bound<T> transform;
+  private static class StreamingPubsubIORead<T> extends PTransform<PBegin, PCollection<T>> {
+    private final PubsubUnboundedSource<T> transform;
+
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    public StreamingPubsubIORead(
+        DataflowPipelineRunner runner, PubsubUnboundedSource<T> transform) {
+      this.transform = transform;
+    }
+
+    PubsubUnboundedSource<T> getOverriddenTransform() {
+      return transform;
+    }
+
+    @Override
+    public PCollection<T> apply(PBegin input) {
+      return PCollection.<T>createPrimitiveOutputInternal(
+          input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
+          .setCoder(transform.getElementCoder());
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingPubsubIORead";
+    }
+
+    static {
+      DataflowPipelineTranslator.registerTransformTranslator(
+          StreamingPubsubIORead.class, new StreamingPubsubIOReadTranslator());
+    }
+  }
+
+  /**
+   * Rewrite {@link StreamingPubsubIORead} to the appropriate internal node.
+   */
+  private static class StreamingPubsubIOReadTranslator implements
+      TransformTranslator<StreamingPubsubIORead> {
+    @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public void translate(
+        StreamingPubsubIORead transform,
+        TranslationContext context) {
+      translateTyped(transform, context);
+    }
+
+    private <T> void translateTyped(
+        StreamingPubsubIORead<T> transform,
+        TranslationContext context) {
+      checkArgument(context.getPipelineOptions().isStreaming(),
+                    "StreamingPubsubIORead is only for streaming pipelines.");
+      PubsubUnboundedSource<T> overriddenTransform = transform.getOverriddenTransform();
+      context.addStep(transform, "ParallelRead");
+      context.addInput(PropertyNames.FORMAT, "pubsub");
+      if (overriddenTransform.getTopic() != null) {
+        context.addInput(PropertyNames.PUBSUB_TOPIC,
+                         overriddenTransform.getTopic().getV1Beta1Path());
+      }
+      if (overriddenTransform.getSubscription() != null) {
+        context.addInput(
+            PropertyNames.PUBSUB_SUBSCRIPTION,
+            overriddenTransform.getSubscription().getV1Beta1Path());
+      }
+      if (overriddenTransform.getTimestampLabel() != null) {
+        context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL,
+                         overriddenTransform.getTimestampLabel());
+      }
+      if (overriddenTransform.getIdLabel() != null) {
+        context.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
+      }
+      context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
+    }
+  }
+
+  /**
+   * Suppress application of {@link PubsubUnboundedSink#apply} in streaming mode so that we
+   * can instead defer to Windmill's implementation.
+   */
+  private static class StreamingPubsubIOWrite<T> extends PTransform<PCollection<T>, PDone> {
+    private final PubsubUnboundedSink<T> transform;
 
     /**
      * Builds an instance of this class from the overridden transform.
      */
     public StreamingPubsubIOWrite(
-        DataflowPipelineRunner runner, PubsubIO.Write.Bound<T> transform) {
+        DataflowPipelineRunner runner, PubsubUnboundedSink<T> transform) {
       this.transform = transform;
     }
 
-    public PubsubIO.Write.Bound<T> getOverriddenTransform() {
+    PubsubUnboundedSink<T> getOverriddenTransform() {
       return transform;
     }
 
@@ -2369,8 +2462,51 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
     protected String getKindString() {
       return "StreamingPubsubIOWrite";
     }
+
+    static {
+      DataflowPipelineTranslator.registerTransformTranslator(
+          StreamingPubsubIOWrite.class, new StreamingPubsubIOWriteTranslator());
+    }
+  }
+
+  /**
+   * Rewrite {@link StreamingPubsubIOWrite} to the appropriate internal node.
+   */
+  private static class StreamingPubsubIOWriteTranslator implements
+      TransformTranslator<StreamingPubsubIOWrite> {
+
+    @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public void translate(
+        StreamingPubsubIOWrite transform,
+        TranslationContext context) {
+      translateTyped(transform, context);
+    }
+
+    private <T> void translateTyped(
+        StreamingPubsubIOWrite<T> transform,
+        TranslationContext context) {
+      checkArgument(context.getPipelineOptions().isStreaming(),
+                    "StreamingPubsubIOWrite is only for streaming pipelines.");
+      PubsubUnboundedSink<T> overriddenTransform = transform.getOverriddenTransform();
+      context.addStep(transform, "ParallelWrite");
+      context.addInput(PropertyNames.FORMAT, "pubsub");
+      context.addInput(PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().getV1Beta1Path());
+      if (overriddenTransform.getTimestampLabel() != null) {
+        context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL,
+                         overriddenTransform.getTimestampLabel());
+      }
+      if (overriddenTransform.getIdLabel() != null) {
+        context.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
+      }
+      context.addEncodingInput(
+          WindowedValue.getValueOnlyCoder(overriddenTransform.getElementCoder()));
+      context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
+    }
   }
 
+  // ================================================================================
+
   /**
    * Specialized implementation for
    * {@link org.apache.beam.sdk.io.Read.Unbounded Read.Unbounded} for the
@@ -2912,11 +3048,14 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
   }
 
   /**
-   * Specialized expansion for unsupported IO transforms that throws an error.
+   * Specialized expansion for unsupported IO transforms and DoFns that throws an error.
    */
   private static class UnsupportedIO<InputT extends PInput, OutputT extends POutput>
       extends PTransform<InputT, OutputT> {
+    @Nullable
     private PTransform<?, ?> transform;
+    @Nullable
+    private DoFn<?, ?> doFn;
 
     /**
      * Builds an instance of this class from the overridden transform.
@@ -2974,13 +3113,51 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
       this.transform = transform;
     }
 
+    /**
+     * Builds an instance of this class from the overridden doFn.
+     */
+    @SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
+    public UnsupportedIO(DataflowPipelineRunner runner,
+                         PubsubIO.Read.Bound<?>.PubsubBoundedReader doFn) {
+      this.doFn = doFn;
+    }
+
+    /**
+     * Builds an instance of this class from the overridden doFn.
+     */
+    @SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
+    public UnsupportedIO(DataflowPipelineRunner runner,
+                         PubsubIO.Write.Bound<?>.PubsubBoundedWriter doFn) {
+      this.doFn = doFn;
+    }
+
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
+    public UnsupportedIO(DataflowPipelineRunner runner, PubsubUnboundedSource<?> transform) {
+      this.transform = transform;
+    }
+
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
+    public UnsupportedIO(DataflowPipelineRunner runner, PubsubUnboundedSink<?> transform) {
+      this.transform = transform;
+    }
+
+
     @Override
     public OutputT apply(InputT input) {
       String mode = input.getPipeline().getOptions().as(StreamingOptions.class).isStreaming()
           ? "streaming" : "batch";
+      String name =
+          transform == null
+              ? approximateSimpleName(doFn.getClass())
+              : approximatePTransformName(transform.getClass());
       throw new UnsupportedOperationException(
-          String.format("The DataflowPipelineRunner in %s mode does not support %s.",
-              mode, approximatePTransformName(transform.getClass())));
+          String.format("The DataflowPipelineRunner in %s mode does not support %s.", mode, name));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index d822803..7f67393 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -32,7 +32,6 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.beam.runners.dataflow.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly;
-import org.apache.beam.runners.dataflow.internal.PubsubIOTranslator;
 import org.apache.beam.runners.dataflow.internal.ReadTranslator;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.DoFnInfo;
@@ -41,7 +40,6 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.io.PubsubIO;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.runners.TransformTreeNode;
@@ -1009,12 +1007,6 @@ public class DataflowPipelineTranslator {
     ///////////////////////////////////////////////////////////////////////////
     // IO Translation.
 
-    registerTransformTranslator(
-        PubsubIO.Read.Bound.class, new PubsubIOTranslator.ReadTranslator());
-    registerTransformTranslator(
-        DataflowPipelineRunner.StreamingPubsubIOWrite.class,
-        new PubsubIOTranslator.WriteTranslator());
-
     registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/PubsubIOTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/PubsubIOTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/PubsubIOTranslator.java
deleted file mode 100755
index 976f948..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/PubsubIOTranslator.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.internal;
-
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
-import org.apache.beam.sdk.io.PubsubIO;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.util.WindowedValue;
-
-/**
- * Pubsub transform support code for the Dataflow backend.
- */
-public class PubsubIOTranslator {
-
-  /**
-   * Implements PubsubIO Read translation for the Dataflow backend.
-   */
-  public static class ReadTranslator<T> implements TransformTranslator<PubsubIO.Read.Bound<T>> {
-    @Override
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    public void translate(
-        PubsubIO.Read.Bound transform,
-        TranslationContext context) {
-      translateReadHelper(transform, context);
-    }
-
-    private <T> void translateReadHelper(
-        PubsubIO.Read.Bound<T> transform,
-        TranslationContext context) {
-      if (!context.getPipelineOptions().isStreaming()) {
-        throw new IllegalArgumentException(
-            "PubsubIO.Read can only be used with the Dataflow streaming runner.");
-      }
-
-      context.addStep(transform, "ParallelRead");
-      context.addInput(PropertyNames.FORMAT, "pubsub");
-      if (transform.getTopic() != null) {
-        context.addInput(PropertyNames.PUBSUB_TOPIC, transform.getTopic().asV1Beta1Path());
-      }
-      if (transform.getSubscription() != null) {
-        context.addInput(
-            PropertyNames.PUBSUB_SUBSCRIPTION, transform.getSubscription().asV1Beta1Path());
-      }
-      if (transform.getTimestampLabel() != null) {
-        context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, transform.getTimestampLabel());
-      }
-      if (transform.getIdLabel() != null) {
-        context.addInput(PropertyNames.PUBSUB_ID_LABEL, transform.getIdLabel());
-      }
-      context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-    }
-  }
-
-  /**
-   * Implements PubsubIO Write translation for the Dataflow backend.
-   */
-  public static class WriteTranslator<T>
-      implements TransformTranslator<DataflowPipelineRunner.StreamingPubsubIOWrite<T>> {
-
-    @Override
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    public void translate(
-        DataflowPipelineRunner.StreamingPubsubIOWrite transform,
-        TranslationContext context) {
-      translateWriteHelper(transform, context);
-    }
-
-    private <T> void translateWriteHelper(
-        DataflowPipelineRunner.StreamingPubsubIOWrite<T> customTransform,
-        TranslationContext context) {
-      if (!context.getPipelineOptions().isStreaming()) {
-        throw new IllegalArgumentException(
-            "PubsubIO.Write is non-primitive for the Dataflow batch runner.");
-      }
-
-      PubsubIO.Write.Bound<T> transform = customTransform.getOverriddenTransform();
-
-      context.addStep(customTransform, "ParallelWrite");
-      context.addInput(PropertyNames.FORMAT, "pubsub");
-      context.addInput(PropertyNames.PUBSUB_TOPIC, transform.getTopic().asV1Beta1Path());
-      if (transform.getTimestampLabel() != null) {
-        context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, transform.getTimestampLabel());
-      }
-      if (transform.getIdLabel() != null) {
-        context.addInput(PropertyNames.PUBSUB_ID_LABEL, transform.getIdLabel());
-      }
-      context.addEncodingInput(WindowedValue.getValueOnlyCoder(transform.getCoder()));
-      context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(customTransform));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java
index 4874877..3df9cdb 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java
@@ -42,21 +42,22 @@ public class DataflowPubsubIOTest {
   @Test
   public void testPrimitiveWriteDisplayData() {
     DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
-    PubsubIO.Write.Bound<?> write = PubsubIO.Write
-        .topic("projects/project/topics/topic");
+    PubsubIO.Write.Bound<?> write = PubsubIO.Write.topic("projects/project/topics/topic");
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
     assertThat("PubsubIO.Write should include the topic in its primitive display data",
-        displayData, hasItem(hasDisplayItem("topic")));
+               displayData, hasItem(hasDisplayItem("topic")));
   }
 
   @Test
   public void testPrimitiveReadDisplayData() {
     DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
-    PubsubIO.Read.Bound<String> read = PubsubIO.Read.topic("projects/project/topics/topic");
+    PubsubIO.Read.Bound<String> read =
+        PubsubIO.Read.subscription("projects/project/subscriptions/subscription")
+                     .maxNumRecords(1);
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
-    assertThat("PubsubIO.Read should include the topic in its primitive display data",
-        displayData, hasItem(hasDisplayItem("topic")));
+    assertThat("PubsubIO.Read should include the subscription in its primitive display data",
+               displayData, hasItem(hasDisplayItem("subscription")));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 78fec85..23a1140 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -36,10 +36,10 @@ import org.apache.beam.sdk.util.PubsubApiaryClient;
 import org.apache.beam.sdk.util.PubsubClient;
 import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
 import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.util.PubsubClient.ProjectPath;
+import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
 import org.apache.beam.sdk.util.PubsubClient.TopicPath;
-import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PInput;
 
@@ -54,7 +54,6 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Random;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
@@ -634,12 +633,12 @@ public class PubsubIO {
       @Override
       public PCollection<T> apply(PInput input) {
         if (topic == null && subscription == null) {
-          throw new IllegalStateException("need to set either the topic or the subscription for "
+          throw new IllegalStateException("Need to set either the topic or the subscription for "
               + "a PubsubIO.Read transform");
         }
         if (topic != null && subscription != null) {
-          throw new IllegalStateException("Can't set both the topic and the subscription for a "
-              + "PubsubIO.Read transform");
+          throw new IllegalStateException("Can't set both the topic and the subscription for "
+              + "a PubsubIO.Read transform");
         }
 
         boolean boundedOutput = getMaxNumRecords() > 0 || getMaxReadTime() != null;
@@ -649,9 +648,19 @@ public class PubsubIO {
                       .apply(Create.of((Void) null)).setCoder(VoidCoder.of())
                       .apply(ParDo.of(new PubsubBoundedReader())).setCoder(coder);
         } else {
-          return PCollection.<T>createPrimitiveOutputInternal(
-                  input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
-              .setCoder(coder);
+          @Nullable ProjectPath projectPath =
+              topic == null ? null : PubsubClient.projectPathFromId(topic.project);
+          @Nullable TopicPath topicPath =
+              topic == null ? null : PubsubClient.topicPathFromName(topic.project, topic.topic);
+          @Nullable SubscriptionPath subscriptionPath =
+              subscription == null
+                  ? null
+                  : PubsubClient
+                      .subscriptionPathFromName(subscription.project, subscription.subscription);
+          return input.getPipeline().begin()
+                      .apply(new PubsubUnboundedSource<T>(
+                          FACTORY, projectPath, topicPath, subscriptionPath,
+                          coder, timestampLabel, idLabel));
         }
       }
 
@@ -707,12 +716,16 @@ public class PubsubIO {
 
       /**
        * Default reader when Pubsub subscription has some form of upper bound.
-       * <p>TODO: Consider replacing with BoundedReadFromUnboundedSource on top of upcoming
-       * PubsubUnboundedSource.
-       * <p>NOTE: This is not the implementation used when running on the Google Dataflow hosted
-       * service.
+       *
+       * <p>TODO: Consider replacing with BoundedReadFromUnboundedSource on top
+       * of PubsubUnboundedSource.
+       *
+       * <p>NOTE: This is not the implementation used when running on the Google Cloud Dataflow
+       * service in streaming mode.
+       *
+       * <p>Public so can be suppressed by runners.
        */
-      private class PubsubBoundedReader extends DoFn<Void, T> {
+      public class PubsubBoundedReader extends DoFn<Void, T> {
         private static final int DEFAULT_PULL_SIZE = 100;
         private static final int ACK_TIMEOUT_SEC = 60;
 
@@ -724,20 +737,20 @@ public class PubsubIO {
 
             PubsubClient.SubscriptionPath subscriptionPath;
             if (getSubscription() == null) {
-              // Create a randomized subscription derived from the topic name.
-              String subscription = getTopic().topic + "_dataflow_" + new Random().nextLong();
+              TopicPath topicPath =
+                  PubsubClient.topicPathFromName(getTopic().project, getTopic().topic);
               // The subscription will be registered under this pipeline's project if we know it.
               // Otherwise we'll fall back to the topic's project.
               // Note that they don't need to be the same.
-              String project = c.getPipelineOptions().as(PubsubOptions.class).getProject();
-              if (Strings.isNullOrEmpty(project)) {
-                project = getTopic().project;
+              String projectId =
+                  c.getPipelineOptions().as(PubsubOptions.class).getProject();
+              if (Strings.isNullOrEmpty(projectId)) {
+                projectId = getTopic().project;
               }
-              subscriptionPath = PubsubClient.subscriptionPathFromName(project, subscription);
-              TopicPath topicPath =
-                  PubsubClient.topicPathFromName(getTopic().project, getTopic().topic);
+              ProjectPath projectPath = PubsubClient.projectPathFromId(projectId);
               try {
-                pubsubClient.createSubscription(topicPath, subscriptionPath, ACK_TIMEOUT_SEC);
+                subscriptionPath =
+                    pubsubClient.createRandomSubscription(projectPath, topicPath, ACK_TIMEOUT_SEC);
               } catch (Exception e) {
                 throw new RuntimeException("Failed to create subscription: ", e);
               }
@@ -795,6 +808,12 @@ public class PubsubIO {
             }
           }
         }
+
+        @Override
+        public void populateDisplayData(DisplayData.Builder builder) {
+          super.populateDisplayData(builder);
+          Bound.this.populateDisplayData(builder);
+        }
       }
     }
 
@@ -961,8 +980,20 @@ public class PubsubIO {
         if (topic == null) {
           throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
         }
-        input.apply(ParDo.of(new PubsubWriter()));
-        return PDone.in(input.getPipeline());
+        switch (input.isBounded()) {
+          case BOUNDED:
+            input.apply(ParDo.of(new PubsubBoundedWriter()));
+            return PDone.in(input.getPipeline());
+          case UNBOUNDED:
+            return input.apply(new PubsubUnboundedSink<T>(
+                FACTORY,
+                PubsubClient.topicPathFromName(topic.project, topic.topic),
+                coder,
+                timestampLabel,
+                idLabel,
+                100 /* numShards */));
+        }
+        throw new RuntimeException(); // cases are exhaustive.
       }
 
       @Override
@@ -993,11 +1024,14 @@ public class PubsubIO {
       }
 
       /**
-       * Writer to Pubsub which batches messages.
-       * <p>NOTE: This is not the implementation used when running on the Google Dataflow hosted
-       * service.
+       * Writer to Pubsub which batches messages from bounded collections.
+       *
+       * <p>NOTE: This is not the implementation used when running on the Google Cloud Dataflow
+       * service in streaming mode.
+       *
+       * <p>Public so can be suppressed by runners.
        */
-      private class PubsubWriter extends DoFn<T, Void> {
+      public class PubsubBoundedWriter extends DoFn<T, Void> {
         private static final int MAX_PUBLISH_BATCH_SIZE = 100;
         private transient List<OutgoingMessage> output;
         private transient PubsubClient pubsubClient;
@@ -1005,15 +1039,18 @@ public class PubsubIO {
         @Override
         public void startBundle(Context c) throws IOException {
           this.output = new ArrayList<>();
-          this.pubsubClient = FACTORY.newClient(timestampLabel, idLabel,
-                                                c.getPipelineOptions().as(PubsubOptions.class));
+          // NOTE: idLabel is ignored.
+          this.pubsubClient =
+              FACTORY.newClient(timestampLabel, null,
+                                c.getPipelineOptions().as(PubsubOptions.class));
         }
 
         @Override
         public void processElement(ProcessContext c) throws IOException {
+          // NOTE: The record id is always null.
           OutgoingMessage message =
               new OutgoingMessage(CoderUtils.encodeToByteArray(getCoder(), c.element()),
-                  c.timestamp().getMillis());
+                                  c.timestamp().getMillis(), null);
           output.add(message);
 
           if (output.size() >= MAX_PUBLISH_BATCH_SIZE) {
@@ -1041,6 +1078,7 @@ public class PubsubIO {
 
         @Override
         public void populateDisplayData(DisplayData.Builder builder) {
+          super.populateDisplayData(builder);
           Bound.this.populateDisplayData(builder);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index 7ca2b57..6ff9b40 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -26,6 +26,8 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
@@ -52,6 +54,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.hash.Hashing;
 
 import org.joda.time.Duration;
 import org.slf4j.Logger;
@@ -62,6 +65,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
 import javax.annotation.Nullable;
@@ -81,6 +85,8 @@ import javax.annotation.Nullable;
  * <li>A failed bundle will cause messages to be resent. Thus we rely on the Pubsub consumer
  * to dedup messages.
  * </ul>
+ *
+ * <p>NOTE: This is not the implementation used when running on the Google Cloud Dataflow service.
  */
 public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
   private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSink.class);
@@ -104,12 +110,16 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
    * Coder for conveying outgoing messages between internal stages.
    */
   private static class OutgoingMessageCoder extends CustomCoder<OutgoingMessage> {
+    private static final NullableCoder<String> RECORD_ID_CODER =
+        NullableCoder.of(StringUtf8Coder.of());
+
     @Override
     public void encode(
         OutgoingMessage value, OutputStream outStream, Context context)
         throws CoderException, IOException {
       ByteArrayCoder.of().encode(value.elementBytes, outStream, Context.NESTED);
       BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, Context.NESTED);
+      RECORD_ID_CODER.encode(value.recordId, outStream, Context.NESTED);
     }
 
     @Override
@@ -117,7 +127,8 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
         InputStream inStream, Context context) throws CoderException, IOException {
       byte[] elementBytes = ByteArrayCoder.of().decode(inStream, Context.NESTED);
       long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, Context.NESTED);
-      return new OutgoingMessage(elementBytes, timestampMsSinceEpoch);
+      @Nullable String recordId = RECORD_ID_CODER.decode(inStream, Context.NESTED);
+      return new OutgoingMessage(elementBytes, timestampMsSinceEpoch, recordId);
     }
   }
 
@@ -125,6 +136,23 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
   static final Coder<OutgoingMessage> CODER = new OutgoingMessageCoder();
 
   // ================================================================================
+  // RecordIdMethod
+  // ================================================================================
+
+  /**
+   * Specify how record ids are to be generated.
+   */
+  @VisibleForTesting
+  enum RecordIdMethod {
+    /** Leave null. */
+    NONE,
+    /** Generate randomly. */
+    RANDOM,
+    /** Generate deterministically. For testing only. */
+    DETERMINISTIC
+  }
+
+  // ================================================================================
   // ShardFn
   // ================================================================================
 
@@ -136,10 +164,12 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
         createAggregator("elements", new Sum.SumLongFn());
     private final Coder<T> elementCoder;
     private final int numShards;
+    private final RecordIdMethod recordIdMethod;
 
-    ShardFn(Coder<T> elementCoder, int numShards) {
+    ShardFn(Coder<T> elementCoder, int numShards, RecordIdMethod recordIdMethod) {
       this.elementCoder = elementCoder;
       this.numShards = numShards;
+      this.recordIdMethod = recordIdMethod;
     }
 
     @Override
@@ -147,9 +177,23 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
       elementCounter.addValue(1L);
       byte[] elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element());
       long timestampMsSinceEpoch = c.timestamp().getMillis();
-      // TODO: A random record id should be assigned here.
+      @Nullable String recordId = null;
+      switch (recordIdMethod) {
+        case NONE:
+          break;
+        case DETERMINISTIC:
+          recordId = Hashing.murmur3_128().hashBytes(elementBytes).toString();
+          break;
+        case RANDOM:
+          // Since these elements go through a GroupByKey, any  failures while sending to
+          // Pubsub will be retried without falling back and generating a new record id.
+          // Thus even though we may send the same message to Pubsub twice, it is guaranteed
+          // to have the same record id.
+          recordId = UUID.randomUUID().toString();
+          break;
+      }
       c.output(KV.of(ThreadLocalRandom.current().nextInt(numShards),
-                     new OutgoingMessage(elementBytes, timestampMsSinceEpoch)));
+                     new OutgoingMessage(elementBytes, timestampMsSinceEpoch, recordId)));
     }
 
     @Override
@@ -319,6 +363,12 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
    */
   private final Duration maxLatency;
 
+  /**
+   * How record ids should be generated for each record (if {@link #idLabel} is non-{@literal
+   * null}).
+   */
+  private final RecordIdMethod recordIdMethod;
+
   @VisibleForTesting
   PubsubUnboundedSink(
       PubsubClientFactory pubsubFactory,
@@ -329,7 +379,8 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
       int numShards,
       int publishBatchSize,
       int publishBatchBytes,
-      Duration maxLatency) {
+      Duration maxLatency,
+      RecordIdMethod recordIdMethod) {
     this.pubsubFactory = pubsubFactory;
     this.topic = topic;
     this.elementCoder = elementCoder;
@@ -339,6 +390,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
     this.publishBatchSize = publishBatchSize;
     this.publishBatchBytes = publishBatchBytes;
     this.maxLatency = maxLatency;
+    this.recordIdMethod = idLabel == null ? RecordIdMethod.NONE : recordIdMethod;
   }
 
   public PubsubUnboundedSink(
@@ -349,7 +401,8 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
       String idLabel,
       int numShards) {
     this(pubsubFactory, topic, elementCoder, timestampLabel, idLabel, numShards,
-         DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY);
+         DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY,
+         RecordIdMethod.RANDOM);
   }
 
   public TopicPath getTopic() {
@@ -382,7 +435,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
                                                      .plusDelayOf(maxLatency))))
             .discardingFiredPanes())
          .apply(ParDo.named("PubsubUnboundedSink.Shard")
-                     .of(new ShardFn<T>(elementCoder, numShards)))
+                     .of(new ShardFn<T>(elementCoder, numShards, recordIdMethod)))
          .setCoder(KvCoder.of(VarIntCoder.of(), CODER))
          .apply(GroupByKey.<Integer, OutgoingMessage>create())
          .apply(ParDo.named("PubsubUnboundedSink.Writer")

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index d635a8a..0492c76 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.sdk.io;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
@@ -42,13 +43,16 @@ import org.apache.beam.sdk.util.BucketingFunction;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.MovingFunction;
 import org.apache.beam.sdk.util.PubsubClient;
+import org.apache.beam.sdk.util.PubsubClient.ProjectPath;
 import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory;
 import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.util.PubsubClient.TopicPath;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 
 import com.google.api.client.util.Clock;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
@@ -102,11 +106,18 @@ import javax.annotation.Nullable;
  * are blocking. We rely on the underlying runner to allow multiple
  * {@link UnboundedSource.UnboundedReader} instances to execute concurrently and thus hide latency.
  * </ul>
+ *
+ * <p>NOTE: This is not the implementation used when running on the Google Cloud Dataflow service.
  */
 public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> {
   private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSource.class);
 
   /**
+   * Default ACK timeout for created subscriptions.
+   */
+  private static final int DEAULT_ACK_TIMEOUT_SEC = 60;
+
+  /**
    * Coder for checkpoints.
    */
   private static final PubsubCheckpointCoder<?> CHECKPOINT_CODER = new PubsubCheckpointCoder<>();
@@ -292,6 +303,17 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     }
 
     /**
+     * Return current time according to {@code reader}.
+     */
+    private static long now(PubsubReader reader) {
+      if (reader.outer.outer.clock == null) {
+        return System.currentTimeMillis();
+      } else {
+        return reader.outer.outer.clock.currentTimeMillis();
+      }
+    }
+
+    /**
      * BLOCKING
      * NACK all messages which have been read from Pubsub but not passed downstream.
      * This way Pubsub will send them again promptly.
@@ -303,13 +325,13 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
       for (String ackId : notYetReadIds) {
         batchYetToAckIds.add(ackId);
         if (batchYetToAckIds.size() >= ACK_BATCH_SIZE) {
-          long nowMsSinceEpoch = reader.outer.outer.clock.currentTimeMillis();
+          long nowMsSinceEpoch = now(reader);
           reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds);
           batchYetToAckIds.clear();
         }
       }
       if (!batchYetToAckIds.isEmpty()) {
-        long nowMsSinceEpoch = reader.outer.outer.clock.currentTimeMillis();
+        long nowMsSinceEpoch = now(reader);
         reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds);
       }
     }
@@ -614,7 +636,11 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
      * Return the current time, in ms since epoch.
      */
     private long now() {
-      return outer.outer.clock.currentTimeMillis();
+      if (outer.outer.clock == null) {
+        return System.currentTimeMillis();
+      } else {
+        return outer.outer.clock.currentTimeMillis();
+      }
     }
 
     /**
@@ -928,7 +954,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
       if (current == null) {
         throw new NoSuchElementException();
       }
-      return current.recordId;
+      return current.recordId.getBytes(Charsets.UTF_8);
     }
 
     @Override
@@ -1124,8 +1150,9 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
   // ================================================================================
 
   /**
-   * Clock to use for all timekeeping.
+   * For testing only: Clock to use for all timekeeping. If {@literal null} use system clock.
    */
+  @Nullable
   private Clock clock;
 
   /**
@@ -1134,9 +1161,28 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
   private final PubsubClientFactory pubsubFactory;
 
   /**
-   * Subscription to read from.
+   * Project under which to create a subscription if only the {@link #topic} was given.
+   */
+  @Nullable
+  private final ProjectPath project;
+
+  /**
+   * Topic to read from. If {@literal null}, then {@link #subscription} must be given.
+   * Otherwise {@link #subscription} must be null.
    */
-  private final SubscriptionPath subscription;
+  @Nullable
+  private final TopicPath topic;
+
+  /**
+   * Subscription to read from. If {@literal null} then {@link #topic} must be given.
+   * Otherwise {@link #topic} must be null.
+   *
+   * <p>If no subscription is given a random one will be created when the transorm is
+   * applied. This field will be update with that subscription's path. The created
+   * subscription is never deleted.
+   */
+  @Nullable
+  private SubscriptionPath subscription;
 
   /**
    * Coder for elements. Elements are effectively double-encoded: first to a byte array
@@ -1159,25 +1205,60 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
   @Nullable
   private final String idLabel;
 
-  /**
-   * Construct an unbounded source to consume from the Pubsub {@code subscription}.
-   */
-  public PubsubUnboundedSource(
+  @VisibleForTesting
+  PubsubUnboundedSource(
       Clock clock,
       PubsubClientFactory pubsubFactory,
-      SubscriptionPath subscription,
+      @Nullable ProjectPath project,
+      @Nullable TopicPath topic,
+      @Nullable SubscriptionPath subscription,
       Coder<T> elementCoder,
       @Nullable String timestampLabel,
       @Nullable String idLabel) {
+    checkArgument((topic == null) != (subscription == null),
+                  "Exactly one of topic and subscription must be given");
+    checkArgument((topic == null) == (project == null),
+                  "Project must be given if topic is given");
     this.clock = clock;
     this.pubsubFactory = checkNotNull(pubsubFactory);
-    this.subscription = checkNotNull(subscription);
+    this.project = project;
+    this.topic = topic;
+    this.subscription = subscription;
     this.elementCoder = checkNotNull(elementCoder);
     this.timestampLabel = timestampLabel;
     this.idLabel = idLabel;
   }
 
-  public PubsubClient.SubscriptionPath getSubscription() {
+  /**
+   * Construct an unbounded source to consume from the Pubsub {@code subscription}.
+   */
+  public PubsubUnboundedSource(
+      PubsubClientFactory pubsubFactory,
+      @Nullable ProjectPath project,
+      @Nullable TopicPath topic,
+      @Nullable SubscriptionPath subscription,
+      Coder<T> elementCoder,
+      @Nullable String timestampLabel,
+      @Nullable String idLabel) {
+    this(null, pubsubFactory, project, topic, subscription, elementCoder, timestampLabel, idLabel);
+  }
+
+  public Coder<T> getElementCoder() {
+    return elementCoder;
+  }
+
+  @Nullable
+  public ProjectPath getProject() {
+    return project;
+  }
+
+  @Nullable
+  public TopicPath getTopic() {
+    return topic;
+  }
+
+  @Nullable
+  public SubscriptionPath getSubscription() {
     return subscription;
   }
 
@@ -1191,12 +1272,26 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     return idLabel;
   }
 
-  public Coder<T> getElementCoder() {
-    return elementCoder;
-  }
-
   @Override
   public PCollection<T> apply(PBegin input) {
+    if (subscription == null) {
+      try {
+        try (PubsubClient pubsubClient =
+                 pubsubFactory.newClient(timestampLabel, idLabel,
+                                         input.getPipeline()
+                                              .getOptions()
+                                              .as(PubsubOptions.class))) {
+          subscription =
+              pubsubClient.createRandomSubscription(project, topic, DEAULT_ACK_TIMEOUT_SEC);
+          LOG.warn("Created subscription {} to topic {}."
+                   + " Note this subscription WILL NOT be deleted when the pipeline terminates",
+                   subscription, topic);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to create subscription: ", e);
+      }
+    }
+
     return input.getPipeline().begin()
                 .apply(Read.from(new PubsubSource<T>(this)))
                 .apply(ParDo.named("PubsubUnboundedSource.Stats")

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
index aa73d42..08981d0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
@@ -40,7 +40,6 @@ import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
-import com.google.common.hash.Hashing;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -135,11 +134,8 @@ public class PubsubApiaryClient extends PubsubClient {
         attributes.put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
       }
 
-      if (idLabel != null) {
-        // TODO: The id should be associated with the OutgoingMessage so that it is stable
-        // across retried bundles
-        attributes.put(idLabel,
-                       Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString());
+      if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+        attributes.put(idLabel, outgoingMessage.recordId);
       }
 
       pubsubMessages.add(pubsubMessage);
@@ -185,15 +181,13 @@ public class PubsubApiaryClient extends PubsubClient {
       checkState(!Strings.isNullOrEmpty(ackId));
 
       // Record id, if any.
-      @Nullable byte[] recordId = null;
+      @Nullable String recordId = null;
       if (idLabel != null && attributes != null) {
-        String recordIdString = attributes.get(idLabel);
-        if (!Strings.isNullOrEmpty(recordIdString)) {
-          recordId = recordIdString.getBytes();
-        }
+        recordId = attributes.get(idLabel);
       }
-      if (recordId == null) {
-        recordId = pubsubMessage.getMessageId().getBytes();
+      if (Strings.isNullOrEmpty(recordId)) {
+        // Fall back to the Pubsub provided message id.
+        recordId = pubsubMessage.getMessageId();
       }
 
       incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
index dc4858e..07ce97d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
@@ -33,6 +33,7 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 
 /**
@@ -132,6 +133,12 @@ public abstract class PubsubClient implements Closeable {
       return path;
     }
 
+    public String getId() {
+      String[] splits = path.split("/");
+      checkState(splits.length == 1, "Malformed project path %s", path);
+      return splits[1];
+    }
+
     @Override
     public boolean equals(Object o) {
       if (this == o) {
@@ -180,6 +187,12 @@ public abstract class PubsubClient implements Closeable {
       return path;
     }
 
+    public String getName() {
+      String[] splits = path.split("/");
+      checkState(splits.length == 4, "Malformed subscription path %s", path);
+      return splits[3];
+    }
+
     public String getV1Beta1Path() {
       String[] splits = path.split("/");
       checkState(splits.length == 4, "Malformed subscription path %s", path);
@@ -233,6 +246,12 @@ public abstract class PubsubClient implements Closeable {
       return path;
     }
 
+    public String getName() {
+      String[] splits = path.split("/");
+      checkState(splits.length == 4, "Malformed topic path %s", path);
+      return splits[3];
+    }
+
     public String getV1Beta1Path() {
       String[] splits = path.split("/");
       checkState(splits.length == 4, "Malformed topic path %s", path);
@@ -286,11 +305,18 @@ public abstract class PubsubClient implements Closeable {
      */
     public final long timestampMsSinceEpoch;
 
-    // TODO: Support a record id.
+    /**
+     * If using an id label, the record id to associate with this record's metadata so the receiver
+     * can reject duplicates. Otherwise {@literal null}.
+     */
+    @Nullable
+    public final String recordId;
 
-    public OutgoingMessage(byte[] elementBytes, long timestampMsSinceEpoch) {
+    public OutgoingMessage(
+        byte[] elementBytes, long timestampMsSinceEpoch, @Nullable String recordId) {
       this.elementBytes = elementBytes;
       this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+      this.recordId = recordId;
     }
 
     @Override
@@ -310,16 +336,14 @@ public abstract class PubsubClient implements Closeable {
 
       OutgoingMessage that = (OutgoingMessage) o;
 
-      if (timestampMsSinceEpoch != that.timestampMsSinceEpoch) {
-        return false;
-      }
-      return Arrays.equals(elementBytes, that.elementBytes);
-
+      return timestampMsSinceEpoch == that.timestampMsSinceEpoch
+             && Arrays.equals(elementBytes, that.elementBytes)
+             && Objects.equal(recordId, that.recordId);
     }
 
     @Override
     public int hashCode() {
-      return Objects.hashCode(Arrays.hashCode(elementBytes), timestampMsSinceEpoch);
+      return Objects.hashCode(Arrays.hashCode(elementBytes), timestampMsSinceEpoch, recordId);
     }
   }
 
@@ -353,14 +377,14 @@ public abstract class PubsubClient implements Closeable {
     /**
      * Id to pass to the runner to distinguish this message from all others.
      */
-    public final byte[] recordId;
+    public final String recordId;
 
     public IncomingMessage(
         byte[] elementBytes,
         long timestampMsSinceEpoch,
         long requestTimeMsSinceEpoch,
         String ackId,
-        byte[] recordId) {
+        String recordId) {
       this.elementBytes = elementBytes;
       this.timestampMsSinceEpoch = timestampMsSinceEpoch;
       this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
@@ -390,26 +414,18 @@ public abstract class PubsubClient implements Closeable {
 
       IncomingMessage that = (IncomingMessage) o;
 
-      if (timestampMsSinceEpoch != that.timestampMsSinceEpoch) {
-        return false;
-      }
-      if (requestTimeMsSinceEpoch != that.requestTimeMsSinceEpoch) {
-        return false;
-      }
-      if (!Arrays.equals(elementBytes, that.elementBytes)) {
-        return false;
-      }
-      if (!ackId.equals(that.ackId)) {
-        return false;
-      }
-      return Arrays.equals(recordId, that.recordId);
+      return timestampMsSinceEpoch == that.timestampMsSinceEpoch
+             && requestTimeMsSinceEpoch == that.requestTimeMsSinceEpoch
+             && ackId.equals(that.ackId)
+             && recordId.equals(that.recordId)
+             && Arrays.equals(elementBytes, that.elementBytes);
     }
 
     @Override
     public int hashCode() {
       return Objects.hashCode(Arrays.hashCode(elementBytes), timestampMsSinceEpoch,
                               requestTimeMsSinceEpoch,
-                              ackId, Arrays.hashCode(recordId));
+                              ackId, recordId);
     }
   }
 
@@ -485,6 +501,22 @@ public abstract class PubsubClient implements Closeable {
       TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException;
 
   /**
+   * Create a random subscription for {@code topic}. Return the {@link SubscriptionPath}. It
+   * is the responsibility of the caller to later delete the subscription.
+   *
+   * @throws IOException
+   */
+  public SubscriptionPath createRandomSubscription(
+      ProjectPath project, TopicPath topic, int ackDeadlineSeconds) throws IOException {
+    // Create a randomized subscription derived from the topic name.
+    String subscriptionName = topic.getName() + "_beam_" + ThreadLocalRandom.current().nextLong();
+    SubscriptionPath subscription =
+        PubsubClient.subscriptionPathFromName(project.getId(), subscriptionName);
+    createSubscription(topic, subscription, ackDeadlineSeconds);
+    return subscription;
+  }
+
+  /**
    * Delete {@code subscription}.
    *
    * @throws IOException
@@ -507,7 +539,7 @@ public abstract class PubsubClient implements Closeable {
   public abstract int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException;
 
   /**
-   * Return {@literal true} if {@link pull} will always return empty list. Actual clients
+   * Return {@literal true} if {@link #pull} will always return empty list. Actual clients
    * will return {@literal false}. Test clients may return {@literal true} to signal that all
    * expected messages have been pulled and the test may complete.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
index e759513..ac157fb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
@@ -27,7 +27,6 @@ import com.google.auth.oauth2.GoogleCredentials;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
-import com.google.common.hash.Hashing;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Timestamp;
 import com.google.pubsub.v1.AcknowledgeRequest;
@@ -257,10 +256,8 @@ public class PubsubGrpcClient extends PubsubClient {
                .put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
       }
 
-      if (idLabel != null) {
-        message.getMutableAttributes()
-               .put(idLabel,
-                    Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString());
+      if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+        message.getMutableAttributes().put(idLabel, outgoingMessage.recordId);
       }
 
       request.addMessages(message);
@@ -308,15 +305,13 @@ public class PubsubGrpcClient extends PubsubClient {
       checkState(!Strings.isNullOrEmpty(ackId));
 
       // Record id, if any.
-      @Nullable byte[] recordId = null;
+      @Nullable String recordId = null;
       if (idLabel != null && attributes != null) {
-        String recordIdString = attributes.get(idLabel);
-        if (recordIdString != null && !recordIdString.isEmpty()) {
-          recordId = recordIdString.getBytes();
-        }
+        recordId = attributes.get(idLabel);
       }
-      if (recordId == null) {
-        recordId = pubsubMessage.getMessageId().getBytes();
+      if (Strings.isNullOrEmpty(recordId)) {
+        // Fall back to the Pubsub provided message id.
+        recordId = pubsubMessage.getMessageId();
       }
 
       incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
index c1dfa06..9fa0380 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
@@ -46,10 +46,9 @@ public class PubsubTestClient extends PubsubClient {
    * Mimic the state of the simulated Pubsub 'service'.
    *
    * Note that the {@link PubsubTestClientFactory} is serialized/deserialized even when running
-   * test
-   * pipelines. Meanwhile it is valid for multiple {@link PubsubTestClient}s to be created from
-   * the same client factory and run in parallel. Thus we can't enforce aliasing of the following
-   * data structures over all clients and must resort to a static.
+   * test pipelines. Meanwhile it is valid for multiple {@link PubsubTestClient}s to be created
+   * from the same client factory and run in parallel. Thus we can't enforce aliasing of the
+   * following data structures over all clients and must resort to a static.
    */
   private static class State {
     /**
@@ -70,6 +69,13 @@ public class PubsubTestClient extends PubsubClient {
     Set<OutgoingMessage> remainingExpectedOutgoingMessages;
 
     /**
+     * Publish mode only: Messages which should throw when first sent to simulate transient publish
+     * failure.
+     */
+    @Nullable
+    Set<OutgoingMessage> remainingFailingOutgoingMessages;
+
+    /**
      * Pull mode only: Clock from which to get current time.
      */
     @Nullable
@@ -119,11 +125,13 @@ public class PubsubTestClient extends PubsubClient {
    */
   public static PubsubTestClientFactory createFactoryForPublish(
       final TopicPath expectedTopic,
-      final Iterable<OutgoingMessage> expectedOutgoingMessages) {
+      final Iterable<OutgoingMessage> expectedOutgoingMessages,
+      final Iterable<OutgoingMessage> failingOutgoingMessages) {
     synchronized (STATE) {
       checkState(!STATE.isActive, "Test still in flight");
       STATE.expectedTopic = expectedTopic;
       STATE.remainingExpectedOutgoingMessages = Sets.newHashSet(expectedOutgoingMessages);
+      STATE.remainingFailingOutgoingMessages = Sets.newHashSet(failingOutgoingMessages);
       STATE.isActive = true;
     }
     return new PubsubTestClientFactory() {
@@ -257,6 +265,9 @@ public class PubsubTestClient extends PubsubClient {
       checkState(topic.equals(STATE.expectedTopic), "Topic %s does not match expected %s", topic,
                  STATE.expectedTopic);
       for (OutgoingMessage outgoingMessage : outgoingMessages) {
+        if (STATE.remainingFailingOutgoingMessages.remove(outgoingMessage)) {
+          throw new RuntimeException("Simulating failure for " + outgoingMessage);
+        }
         checkState(STATE.remainingExpectedOutgoingMessages.remove(outgoingMessage),
                    "Unexpected outgoing message %s", outgoingMessage);
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
index b4ef785..bf70e47 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
@@ -19,6 +19,7 @@
 package org.apache.beam.sdk.io;
 
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.PubsubUnboundedSink.RecordIdMethod;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -31,7 +32,7 @@ import org.apache.beam.sdk.util.PubsubTestClient;
 import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
+import com.google.common.hash.Hashing;
 
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -41,9 +42,7 @@ import org.junit.runners.JUnit4;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
 /**
  * Test PubsubUnboundedSink.
@@ -55,6 +54,7 @@ public class PubsubUnboundedSinkTest {
   private static final long TIMESTAMP = 1234L;
   private static final String TIMESTAMP_LABEL = "timestamp";
   private static final String ID_LABEL = "id";
+  private static final int NUM_SHARDS = 10;
 
   private static class Stamp extends DoFn<String, String> {
     @Override
@@ -63,22 +63,30 @@ public class PubsubUnboundedSinkTest {
     }
   }
 
+  private String getRecordId(String data) {
+    return Hashing.murmur3_128().hashBytes(data.getBytes()).toString();
+  }
+
   @Test
   public void saneCoder() throws Exception {
-    OutgoingMessage message = new OutgoingMessage(DATA.getBytes(), TIMESTAMP);
+    OutgoingMessage message = new OutgoingMessage(DATA.getBytes(), TIMESTAMP, getRecordId(DATA));
     CoderProperties.coderDecodeEncodeEqual(PubsubUnboundedSink.CODER, message);
     CoderProperties.coderSerializable(PubsubUnboundedSink.CODER);
   }
 
   @Test
   public void sendOneMessage() throws IOException {
-    Set<OutgoingMessage> outgoing =
-        Sets.newHashSet(new OutgoingMessage(DATA.getBytes(), TIMESTAMP));
+    List<OutgoingMessage> outgoing =
+        ImmutableList.of(new OutgoingMessage(DATA.getBytes(), TIMESTAMP, getRecordId(DATA)));
+    int batchSize = 1;
+    int batchBytes = 1;
     try (PubsubTestClientFactory factory =
-             PubsubTestClient.createFactoryForPublish(TOPIC, outgoing)) {
+             PubsubTestClient.createFactoryForPublish(TOPIC, outgoing,
+                                                      ImmutableList.<OutgoingMessage>of())) {
       PubsubUnboundedSink<String> sink =
           new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
-                                    10);
+                                    NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2),
+                                    RecordIdMethod.DETERMINISTIC);
       TestPipeline p = TestPipeline.create();
       p.apply(Create.of(ImmutableList.of(DATA)))
        .apply(ParDo.of(new Stamp()))
@@ -91,20 +99,22 @@ public class PubsubUnboundedSinkTest {
 
   @Test
   public void sendMoreThanOneBatchByNumMessages() throws IOException {
-    Set<OutgoingMessage> outgoing = new HashSet<>();
+    List<OutgoingMessage> outgoing = new ArrayList<>();
     List<String> data = new ArrayList<>();
     int batchSize = 2;
     int batchBytes = 1000;
     for (int i = 0; i < batchSize * 10; i++) {
       String str = String.valueOf(i);
-      outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP));
+      outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP, getRecordId(str)));
       data.add(str);
     }
     try (PubsubTestClientFactory factory =
-             PubsubTestClient.createFactoryForPublish(TOPIC, outgoing)) {
+             PubsubTestClient.createFactoryForPublish(TOPIC, outgoing,
+                                                      ImmutableList.<OutgoingMessage>of())) {
       PubsubUnboundedSink<String> sink =
           new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
-                                    10, batchSize, batchBytes, Duration.standardSeconds(2));
+                                    NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2),
+                                    RecordIdMethod.DETERMINISTIC);
       TestPipeline p = TestPipeline.create();
       p.apply(Create.of(data))
        .apply(ParDo.of(new Stamp()))
@@ -117,7 +127,7 @@ public class PubsubUnboundedSinkTest {
 
   @Test
   public void sendMoreThanOneBatchByByteSize() throws IOException {
-    Set<OutgoingMessage> outgoing = new HashSet<>();
+    List<OutgoingMessage> outgoing = new ArrayList<>();
     List<String> data = new ArrayList<>();
     int batchSize = 100;
     int batchBytes = 10;
@@ -128,15 +138,17 @@ public class PubsubUnboundedSinkTest {
         sb.append(String.valueOf(n));
       }
       String str = sb.toString();
-      outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP));
+      outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP, getRecordId(str)));
       data.add(str);
       n += str.length();
     }
     try (PubsubTestClientFactory factory =
-             PubsubTestClient.createFactoryForPublish(TOPIC, outgoing)) {
+             PubsubTestClient.createFactoryForPublish(TOPIC, outgoing,
+                                                      ImmutableList.<OutgoingMessage>of())) {
       PubsubUnboundedSink<String> sink =
           new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
-                                    10, batchSize, batchBytes, Duration.standardSeconds(2));
+                                    NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2),
+                                    RecordIdMethod.DETERMINISTIC);
       TestPipeline p = TestPipeline.create();
       p.apply(Create.of(data))
        .apply(ParDo.of(new Stamp()))
@@ -146,4 +158,8 @@ public class PubsubUnboundedSinkTest {
     // The PubsubTestClientFactory will assert fail on close if the actual published
     // message does not match the expected publish message.
   }
+
+  // TODO: We would like to test that failed Pubsub publish calls cause the already assigned
+  // (and random) record ids to be reused. However that can't be done without the test runnner
+  // supporting retrying bundles.
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
index b265d18..3b0a1c8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
@@ -86,14 +86,14 @@ public class PubsubUnboundedSourceTest {
     };
     factory = PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S, incoming);
     PubsubUnboundedSource<String> source =
-        new PubsubUnboundedSource<>(clock, factory, SUBSCRIPTION, StringUtf8Coder.of(),
+        new PubsubUnboundedSource<>(clock, factory, null, null, SUBSCRIPTION, StringUtf8Coder.of(),
                                     TIMESTAMP_LABEL, ID_LABEL);
     primSource = new PubsubSource<>(source);
   }
 
   private void setupOneMessage() {
     setupOneMessage(ImmutableList.of(
-        new IncomingMessage(DATA.getBytes(), TIMESTAMP, 0, ACK_ID, RECORD_ID.getBytes())));
+        new IncomingMessage(DATA.getBytes(), TIMESTAMP, 0, ACK_ID, RECORD_ID)));
   }
 
   @After
@@ -211,7 +211,7 @@ public class PubsubUnboundedSourceTest {
     for (int i = 0; i < 2; i++) {
       String data = String.format("data_%d", i);
       String ackid = String.format("ackid_%d", i);
-      incoming.add(new IncomingMessage(data.getBytes(), TIMESTAMP, 0, ackid, RECORD_ID.getBytes()));
+      incoming.add(new IncomingMessage(data.getBytes(), TIMESTAMP, 0, ackid, RECORD_ID));
     }
     setupOneMessage(incoming);
     TestPipeline p = TestPipeline.create();
@@ -272,7 +272,7 @@ public class PubsubUnboundedSourceTest {
       String recid = String.format("recordid_%d", messageNum);
       String ackId = String.format("ackid_%d", messageNum);
       incoming.add(new IncomingMessage(data.getBytes(), messageNumToTimestamp(messageNum), 0,
-                                       ackId, recid.getBytes()));
+                                       ackId, recid));
     }
     setupOneMessage(incoming);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java
index 40c31fb..0f3a7bb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java
@@ -34,7 +34,6 @@ import com.google.api.services.pubsub.model.PullResponse;
 import com.google.api.services.pubsub.model.ReceivedMessage;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.hash.Hashing;
 
 import org.junit.After;
 import org.junit.Before;
@@ -61,8 +60,7 @@ public class PubsubApiaryClientTest {
   private static final String ID_LABEL = "id";
   private static final String MESSAGE_ID = "testMessageId";
   private static final String DATA = "testData";
-  private static final String CUSTOM_ID =
-      Hashing.murmur3_128().hashBytes(DATA.getBytes()).toString();
+  private static final String RECORD_ID = "testRecordId";
   private static final String ACK_ID = "testAckId";
 
   @Before
@@ -89,7 +87,7 @@ public class PubsubApiaryClientTest {
         .setPublishTime(String.valueOf(PUB_TIME))
         .setAttributes(
             ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
-                            ID_LABEL, CUSTOM_ID));
+                            ID_LABEL, RECORD_ID));
     ReceivedMessage expectedReceivedMessage =
         new ReceivedMessage().setMessage(expectedPubsubMessage)
                              .setAckId(ACK_ID);
@@ -105,7 +103,7 @@ public class PubsubApiaryClientTest {
     IncomingMessage actualMessage = acutalMessages.get(0);
     assertEquals(ACK_ID, actualMessage.ackId);
     assertEquals(DATA, new String(actualMessage.elementBytes));
-    assertEquals(CUSTOM_ID, new String(actualMessage.recordId));
+    assertEquals(RECORD_ID, actualMessage.recordId);
     assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
     assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
   }
@@ -117,7 +115,7 @@ public class PubsubApiaryClientTest {
         .encodeData(DATA.getBytes())
         .setAttributes(
             ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
-                            ID_LABEL, CUSTOM_ID));
+                            ID_LABEL, RECORD_ID));
     PublishRequest expectedRequest = new PublishRequest()
         .setMessages(ImmutableList.of(expectedPubsubMessage));
     PublishResponse expectedResponse = new PublishResponse()
@@ -127,7 +125,7 @@ public class PubsubApiaryClientTest {
                            .publish(expectedTopic, expectedRequest)
                            .execute())
            .thenReturn(expectedResponse);
-    OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME);
+    OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME, RECORD_ID);
     int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
     assertEquals(1, n);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
index 189049c..71ee27c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.util.PubsubClient.TopicPath;
 import com.google.auth.oauth2.GoogleCredentials;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.hash.Hashing;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Timestamp;
 import com.google.pubsub.v1.PublishRequest;
@@ -70,8 +69,7 @@ public class PubsubGrpcClientTest {
   private static final String ID_LABEL = "id";
   private static final String MESSAGE_ID = "testMessageId";
   private static final String DATA = "testData";
-  private static final String CUSTOM_ID =
-      Hashing.murmur3_128().hashBytes(DATA.getBytes()).toString();
+  private static final String RECORD_ID = "testRecordId";
   private static final String ACK_ID = "testAckId";
 
   @Before
@@ -118,7 +116,7 @@ public class PubsubGrpcClientTest {
                      .putAllAttributes(
                          ImmutableMap.of(TIMESTAMP_LABEL,
                                          String.valueOf(MESSAGE_TIME),
-                                         ID_LABEL, CUSTOM_ID))
+                                         ID_LABEL, RECORD_ID))
                      .build();
     ReceivedMessage expectedReceivedMessage =
         ReceivedMessage.newBuilder()
@@ -136,7 +134,7 @@ public class PubsubGrpcClientTest {
     IncomingMessage actualMessage = acutalMessages.get(0);
     assertEquals(ACK_ID, actualMessage.ackId);
     assertEquals(DATA, new String(actualMessage.elementBytes));
-    assertEquals(CUSTOM_ID, new String(actualMessage.recordId));
+    assertEquals(RECORD_ID, actualMessage.recordId);
     assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
     assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
   }
@@ -149,7 +147,7 @@ public class PubsubGrpcClientTest {
                      .setData(ByteString.copyFrom(DATA.getBytes()))
                      .putAllAttributes(
                          ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
-                                         ID_LABEL, CUSTOM_ID))
+                                         ID_LABEL, RECORD_ID))
                      .build();
     PublishRequest expectedRequest =
         PublishRequest.newBuilder()
@@ -163,7 +161,7 @@ public class PubsubGrpcClientTest {
                        .build();
     Mockito.when(mockPublisherStub.publish(expectedRequest))
            .thenReturn(expectedResponse);
-    OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME);
+    OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME, RECORD_ID);
     int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
     assertEquals(1, n);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
index fedc8bf..d788f10 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
@@ -61,7 +61,7 @@ public class PubsubTestClientTest {
       }
     };
     IncomingMessage expectedIncomingMessage =
-        new IncomingMessage(DATA.getBytes(), MESSAGE_TIME, REQ_TIME, ACK_ID, MESSAGE_ID.getBytes());
+        new IncomingMessage(DATA.getBytes(), MESSAGE_TIME, REQ_TIME, ACK_ID, MESSAGE_ID);
     try (PubsubTestClientFactory factory =
              PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S,
                                                    Lists.newArrayList(expectedIncomingMessage))) {
@@ -99,9 +99,13 @@ public class PubsubTestClientTest {
 
   @Test
   public void publishOneMessage() throws IOException {
-    OutgoingMessage expectedOutgoingMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME);
-    try (PubsubTestClientFactory factory = PubsubTestClient.createFactoryForPublish(TOPIC, Sets
-        .newHashSet(expectedOutgoingMessage))) {
+    OutgoingMessage expectedOutgoingMessage =
+        new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME, MESSAGE_ID);
+    try (PubsubTestClientFactory factory =
+             PubsubTestClient.createFactoryForPublish(
+                 TOPIC,
+                 Sets.newHashSet(expectedOutgoingMessage),
+                 ImmutableList.<OutgoingMessage>of())) {
       try (PubsubTestClient client = (PubsubTestClient) factory.newClient(null, null, null)) {
         client.publish(TOPIC, ImmutableList.of(expectedOutgoingMessage));
       }