You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/03 06:28:20 UTC

[3/4] beam git commit: Removed coder and parseFn from PubsubIO.Read

Removed coder and parseFn from PubsubIO.Read


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/25dc94bc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/25dc94bc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/25dc94bc

Branch: refs/heads/master
Commit: 25dc94bc971a0cb8848777c48495929f6dbe8f69
Parents: dc0fdcb
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 20 19:03:27 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 2 23:08:29 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |  49 ++++---
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 116 +++++++++++----
 .../pubsub/PubsubMessagePayloadOnlyCoder.java   |  48 +++++++
 .../PubsubMessageWithAttributesCoder.java       |  57 ++++++++
 .../io/gcp/pubsub/PubsubUnboundedSource.java    | 141 +++++++------------
 .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java    |  32 ++---
 .../gcp/pubsub/PubsubUnboundedSourceTest.java   | 108 +++++++-------
 7 files changed, 350 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/25dc94bc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 68cc8e8..6f29797 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -86,6 +86,8 @@ import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -103,6 +105,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -856,29 +859,30 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   // ================================================================================
 
   /**
-   * Suppress application of {@link PubsubUnboundedSource#expand} in streaming mode so that we
-   * can instead defer to Windmill's implementation.
+   * Suppress application of {@link PubsubUnboundedSource#expand} in streaming mode so that we can
+   * instead defer to Windmill's implementation.
    */
-  private static class StreamingPubsubIORead<T> extends PTransform<PBegin, PCollection<T>> {
-    private final PubsubUnboundedSource<T> transform;
+  private static class StreamingPubsubIORead
+      extends PTransform<PBegin, PCollection<PubsubIO.PubsubMessage>> {
+    private final PubsubUnboundedSource transform;
 
     /**
      * Builds an instance of this class from the overridden transform.
      */
     public StreamingPubsubIORead(
-        DataflowRunner runner, PubsubUnboundedSource<T> transform) {
+        DataflowRunner runner, PubsubUnboundedSource transform) {
       this.transform = transform;
     }
 
-    PubsubUnboundedSource<T> getOverriddenTransform() {
+    PubsubUnboundedSource getOverriddenTransform() {
       return transform;
     }
 
     @Override
-    public PCollection<T> expand(PBegin input) {
-      return PCollection.<T>createPrimitiveOutputInternal(
+    public PCollection<PubsubIO.PubsubMessage> expand(PBegin input) {
+      return PCollection.<PubsubIO.PubsubMessage>createPrimitiveOutputInternal(
           input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
-          .setCoder(transform.getElementCoder());
+          .setCoder(new PubsubMessageWithAttributesCoder());
     }
 
     @Override
@@ -888,19 +892,19 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     static {
       DataflowPipelineTranslator.registerTransformTranslator(
-          StreamingPubsubIORead.class, new StreamingPubsubIOReadTranslator<>());
+          StreamingPubsubIORead.class, new StreamingPubsubIOReadTranslator());
     }
   }
 
   /** Rewrite {@link StreamingPubsubIORead} to the appropriate internal node. */
-  private static class StreamingPubsubIOReadTranslator<T>
-      implements TransformTranslator<StreamingPubsubIORead<T>> {
+  private static class StreamingPubsubIOReadTranslator
+      implements TransformTranslator<StreamingPubsubIORead> {
     @Override
-    public void translate(StreamingPubsubIORead<T> transform, TranslationContext context) {
+    public void translate(StreamingPubsubIORead transform, TranslationContext context) {
       checkArgument(
           context.getPipelineOptions().isStreaming(),
           "StreamingPubsubIORead is only for streaming pipelines.");
-      PubsubUnboundedSource<T> overriddenTransform = transform.getOverriddenTransform();
+      PubsubUnboundedSource overriddenTransform = transform.getOverriddenTransform();
       StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
       stepContext.addInput(PropertyNames.FORMAT, "pubsub");
       if (overriddenTransform.getTopicProvider() != null) {
@@ -932,16 +936,29 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
         stepContext.addInput(
             PropertyNames.PUBSUB_ID_ATTRIBUTE, overriddenTransform.getIdAttribute());
       }
-      if (overriddenTransform.getWithAttributesParseFn() != null) {
+      // In both cases, the transform needs to read PubsubMessage. However, in case it needs
+      // the attributes, we supply an identity "parse fn" so the worker will read PubsubMessage's
+      // from Windmill and simply pass them around; and in case it doesn't need attributes,
+      // we're already implicitly using a "Coder" that interprets the data as a PubsubMessage's
+      // payload.
+      if (overriddenTransform.getNeedsAttributes()) {
         stepContext.addInput(
             PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN,
             byteArrayToJsonString(
-                serializeToByteArray(overriddenTransform.getWithAttributesParseFn())));
+                serializeToByteArray(new IdentityMessageFn())));
       }
       stepContext.addOutput(context.getOutput(transform));
     }
   }
 
+  private static class IdentityMessageFn
+      extends SimpleFunction<PubsubIO.PubsubMessage, PubsubIO.PubsubMessage> {
+    @Override
+    public PubsubIO.PubsubMessage apply(PubsubIO.PubsubMessage input) {
+      return input;
+    }
+  }
+
   /**
    * Suppress application of {@link PubsubUnboundedSink#expand} in streaming mode so that we
    * can instead defer to Windmill's implementation.

http://git-wip-us.apache.org/repos/asf/beam/blob/25dc94bc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 129a25f..af8b7d6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -24,6 +24,7 @@ import com.google.auto.value.AutoValue;
 import com.google.protobuf.Message;
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -32,6 +33,7 @@ import java.util.regex.Pattern;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
@@ -44,6 +46,7 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -461,8 +464,34 @@ public class PubsubIO {
   }
 
    /** Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. */
-  public static <T> Read<T> read() {
-    return new AutoValue_PubsubIO_Read.Builder<T>().build();
+  private static <T> Read<T> read() {
+    return new AutoValue_PubsubIO_Read.Builder<T>().setNeedsAttributes(false).build();
+  }
+
+  /**
+   * Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. The
+   * messages will only contain a {@link PubsubMessage#getMessage() payload}, but no {@link
+   * PubsubMessage#getAttributeMap() attributes}.
+   */
+  public static Read<PubsubMessage> readPubsubMessagesWithoutAttributes() {
+    return new AutoValue_PubsubIO_Read.Builder<PubsubMessage>()
+        .setCoder(PubsubMessagePayloadOnlyCoder.of())
+        .setParseFn(new IdentityMessageFn())
+        .setNeedsAttributes(false)
+        .build();
+  }
+
+  /**
+   * Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. The
+   * messages will contain both a {@link PubsubMessage#getMessage() payload} and {@link
+   * PubsubMessage#getAttributeMap() attributes}.
+   */
+  public static Read<PubsubMessage> readPubsubMessagesWithAttributes() {
+    return new AutoValue_PubsubIO_Read.Builder<PubsubMessage>()
+        .setCoder(PubsubMessageWithAttributesCoder.of())
+        .setParseFn(new IdentityMessageFn())
+        .setNeedsAttributes(true)
+        .build();
   }
 
   /**
@@ -470,7 +499,8 @@ public class PubsubIO {
    * Pub/Sub stream.
    */
   public static Read<String> readStrings() {
-    return PubsubIO.<String>read().withCoder(StringUtf8Coder.of());
+    return PubsubIO.<String>read().withCoderAndParseFn(
+        StringUtf8Coder.of(), new ParsePayloadAsUtf8());
   }
 
   /**
@@ -478,7 +508,11 @@ public class PubsubIO {
    * given type from a Google Cloud Pub/Sub stream.
    */
   public static <T extends Message> Read<T> readProtos(Class<T> messageClass) {
-    return PubsubIO.<T>read().withCoder(ProtoCoder.of(messageClass));
+    // TODO: Stop using ProtoCoder and instead parse the payload directly.
+    // We should not be relying on the fact that ProtoCoder's wire format is identical to
+    // the protobuf wire format, as the wire format is not part of a coder's API.
+    ProtoCoder<T> coder = ProtoCoder.of(messageClass);
+    return PubsubIO.<T>read().withCoderAndParseFn(coder, new ParsePayloadUsingCoder<>(coder));
   }
 
   /**
@@ -486,7 +520,11 @@ public class PubsubIO {
    * given type from a Google Cloud Pub/Sub stream.
    */
   public static <T extends Message> Read<T> readAvros(Class<T> clazz) {
-    return PubsubIO.<T>read().withCoder(AvroCoder.of(clazz));
+    // TODO: Stop using AvroCoder and instead parse the payload directly.
+    // We should not be relying on the fact that AvroCoder's wire format is identical to
+    // the Avro wire format, as the wire format is not part of a coder's API.
+    AvroCoder<T> coder = AvroCoder.of(clazz);
+    return PubsubIO.<T>read().withCoderAndParseFn(coder, new ParsePayloadUsingCoder<>(coder));
   }
 
   /** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. */
@@ -543,6 +581,8 @@ public class PubsubIO {
     @Nullable
     abstract SimpleFunction<PubsubMessage, T> getParseFn();
 
+    abstract boolean getNeedsAttributes();
+
     abstract Builder<T> toBuilder();
 
     @AutoValue.Builder
@@ -559,6 +599,8 @@ public class PubsubIO {
 
       abstract Builder<T> setParseFn(SimpleFunction<PubsubMessage, T> parseFn);
 
+      abstract Builder<T> setNeedsAttributes(boolean needsAttributes);
+
       abstract Read<T> build();
     }
 
@@ -665,20 +707,13 @@ public class PubsubIO {
     }
 
     /**
-     * Uses the given {@link Coder} to decode each record into a value of type {@code T}.
-     */
-    public Read<T> withCoder(Coder<T> coder) {
-      return toBuilder().setCoder(coder).build();
-    }
-
-    /**
      * Causes the source to return a PubsubMessage that includes Pubsub attributes, and uses the
      * given parsing function to transform the PubsubMessage into an output type.
      * A Coder for the output type T must be registered or set on the output via
      * {@link PCollection#setCoder(Coder)}.
      */
-    public Read<T> withParseFn(SimpleFunction<PubsubMessage, T> parseFn) {
-      return toBuilder().setParseFn(parseFn).build();
+    private Read<T> withCoderAndParseFn(Coder<T> coder, SimpleFunction<PubsubMessage, T> parseFn) {
+      return toBuilder().setCoder(coder).setParseFn(parseFn).build();
     }
 
     @Override
@@ -691,10 +726,6 @@ public class PubsubIO {
         throw new IllegalStateException(
             "Can't set both the topic and the subscription for " + "a PubsubIO.Read transform");
       }
-      if (getCoder() == null) {
-        throw new IllegalStateException(
-            "PubsubIO.Read requires that a coder be set using " + "the withCoder method.");
-      }
 
       @Nullable
       ValueProvider<ProjectPath> projectPath =
@@ -711,17 +742,23 @@ public class PubsubIO {
           getSubscriptionProvider() == null
               ? null
               : NestedValueProvider.of(getSubscriptionProvider(), new SubscriptionPathTranslator());
-      PubsubUnboundedSource<T> source =
-          new PubsubUnboundedSource<T>(
+      PubsubUnboundedSource source =
+          new PubsubUnboundedSource(
               FACTORY,
               projectPath,
               topicPath,
               subscriptionPath,
-              getCoder(),
               getTimestampAttribute(),
               getIdAttribute(),
-              getParseFn());
-      return input.getPipeline().apply(source);
+              getNeedsAttributes());
+      return input
+          .getPipeline()
+          .apply(source)
+          .setCoder(
+              getNeedsAttributes()
+                  ? PubsubMessageWithAttributesCoder.of()
+                  : PubsubMessagePayloadOnlyCoder.of())
+          .apply(MapElements.via(getParseFn()));
     }
 
     @Override
@@ -847,7 +884,7 @@ public class PubsubIO {
      * function translates the input type T to a PubsubMessage object, which is used by the sink
      * to separately set the PubSub message's payload and attributes.
      */
-    public Write<T> withFormatFn(SimpleFunction<T, PubsubMessage> formatFn) {
+    private Write<T> withFormatFn(SimpleFunction<T, PubsubMessage> formatFn) {
       return toBuilder().setFormatFn(formatFn).build();
     }
 
@@ -954,4 +991,35 @@ public class PubsubIO {
       }
     }
   }
+
+  private static class ParsePayloadAsUtf8 extends SimpleFunction<PubsubMessage, String> {
+    @Override
+    public String apply(PubsubMessage input) {
+      return new String(input.getMessage(), StandardCharsets.UTF_8);
+    }
+  }
+
+  private static class ParsePayloadUsingCoder<T> extends SimpleFunction<PubsubMessage, T> {
+    private Coder<T> coder;
+
+    public ParsePayloadUsingCoder(Coder<T> coder) {
+      this.coder = coder;
+    }
+
+    @Override
+    public T apply(PubsubMessage input) {
+      try {
+        return CoderUtils.decodeFromByteArray(coder, input.getMessage());
+      } catch (CoderException e) {
+        throw new RuntimeException("Could not decode Pubsub message", e);
+      }
+    }
+  }
+
+  private static class IdentityMessageFn extends SimpleFunction<PubsubMessage, PubsubMessage> {
+    @Override
+    public PubsubMessage apply(PubsubMessage input) {
+      return input;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/25dc94bc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
new file mode 100644
index 0000000..f0dae46
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.util.StreamUtils;
+
+/** A coder for PubsubMessage treating the raw bytes being decoded as the message's payload. */
+public class PubsubMessagePayloadOnlyCoder extends CustomCoder<PubsubIO.PubsubMessage> {
+  public static PubsubMessagePayloadOnlyCoder of() {
+    return new PubsubMessagePayloadOnlyCoder();
+  }
+
+  @Override
+  public void encode(PubsubIO.PubsubMessage value, OutputStream outStream, Context context)
+      throws IOException {
+    checkState(context.isWholeStream, "Expected to only be used in a whole-stream context");
+    outStream.write(value.getMessage());
+  }
+
+  @Override
+  public PubsubIO.PubsubMessage decode(InputStream inStream, Context context) throws IOException {
+    checkState(context.isWholeStream, "Expected to only be used in a whole-stream context");
+    return new PubsubIO.PubsubMessage(
+        StreamUtils.getBytes(inStream), ImmutableMap.<String, String>of());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/25dc94bc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
new file mode 100644
index 0000000..27f0f02
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+
+/** A coder for PubsubMessage including attributes. */
+public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubIO.PubsubMessage> {
+  private static final Coder<byte[]> PAYLOAD_CODER =
+      NullableCoder.of(ByteArrayCoder.of());
+  private static final Coder<Map<String, String>> ATTRIBUTES_CODER = MapCoder.of(
+      StringUtf8Coder.of(), StringUtf8Coder.of());
+
+  public static PubsubMessageWithAttributesCoder of() {
+    return new PubsubMessageWithAttributesCoder();
+  }
+
+  public void encode(PubsubIO.PubsubMessage value, OutputStream outStream, Context context)
+      throws IOException {
+    PAYLOAD_CODER.encode(
+        value.getMessage(),
+        outStream,
+        context.nested());
+    ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream, context);
+  }
+
+  @Override
+  public PubsubIO.PubsubMessage decode(InputStream inStream, Context context) throws IOException {
+    byte[] payload = PAYLOAD_CODER.decode(inStream, context.nested());
+    Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, context);
+    return new PubsubIO.PubsubMessage(payload, attributes);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/25dc94bc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index 903ae41..d366949 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -47,7 +47,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
@@ -66,13 +65,11 @@ import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.BucketingFunction;
-import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.MovingFunction;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -110,7 +107,7 @@ import org.slf4j.LoggerFactory;
  * {@link UnboundedSource.UnboundedReader} instances to execute concurrently and thus hide latency.
  * </ul>
  */
-public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> {
+public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<PubsubIO.PubsubMessage>> {
   private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSource.class);
 
   /**
@@ -121,7 +118,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
   /**
    * Coder for checkpoints.
    */
-  private static final PubsubCheckpointCoder<?> CHECKPOINT_CODER = new PubsubCheckpointCoder<>();
+  private static final PubsubCheckpointCoder CHECKPOINT_CODER = new PubsubCheckpointCoder();
 
   /**
    * Maximum number of messages per pull.
@@ -231,7 +228,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
    * we need to restore.
    */
   @VisibleForTesting
-  static class PubsubCheckpoint<T> implements UnboundedSource.CheckpointMark {
+  static class PubsubCheckpoint implements UnboundedSource.CheckpointMark {
     /**
      * The {@link SubscriptionPath} to the subscription the reader is reading from. May be
      * {@code null} if the {@link PubsubUnboundedSource} contains the subscription.
@@ -247,7 +244,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
      * the 'true' active reader may have changed.
      */
     @Nullable
-    private PubsubReader<T> reader;
+    private PubsubReader reader;
 
     /**
      * If the checkpoint is for persisting: The ACK ids of messages which have been passed
@@ -268,7 +265,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
 
     public PubsubCheckpoint(
         @Nullable String subscriptionPath,
-        @Nullable PubsubReader<T> reader,
+        @Nullable PubsubReader reader,
         @Nullable List<String> safeToAckIds,
         List<String> notYetReadIds) {
       this.subscriptionPath = subscriptionPath;
@@ -327,7 +324,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     /**
      * Return current time according to {@code reader}.
      */
-    private static long now(PubsubReader<?> reader) {
+    private static long now(PubsubReader reader) {
       if (reader.outer.outer.clock == null) {
         return System.currentTimeMillis();
       } else {
@@ -340,7 +337,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
      * NACK all messages which have been read from Pubsub but not passed downstream.
      * This way Pubsub will send them again promptly.
      */
-    public void nackAll(PubsubReader<T> reader) throws IOException {
+    public void nackAll(PubsubReader reader) throws IOException {
       checkState(this.reader == null, "Cannot nackAll on persisting checkpoint");
       List<String> batchYetToAckIds =
           new ArrayList<>(Math.min(notYetReadIds.size(), ACK_BATCH_SIZE));
@@ -360,13 +357,13 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
   }
 
   /** The coder for our checkpoints. */
-  private static class PubsubCheckpointCoder<T> extends CustomCoder<PubsubCheckpoint<T>> {
+  private static class PubsubCheckpointCoder extends CustomCoder<PubsubCheckpoint> {
     private static final Coder<String> SUBSCRIPTION_PATH_CODER =
         NullableCoder.of(StringUtf8Coder.of());
     private static final Coder<List<String>> LIST_CODER = ListCoder.of(StringUtf8Coder.of());
 
     @Override
-    public void encode(PubsubCheckpoint<T> value, OutputStream outStream, Context context)
+    public void encode(PubsubCheckpoint value, OutputStream outStream, Context context)
         throws IOException {
       SUBSCRIPTION_PATH_CODER.encode(
           value.subscriptionPath,
@@ -376,10 +373,10 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     }
 
     @Override
-    public PubsubCheckpoint<T> decode(InputStream inStream, Context context) throws IOException {
+    public PubsubCheckpoint decode(InputStream inStream, Context context) throws IOException {
       String path = SUBSCRIPTION_PATH_CODER.decode(inStream, context.nested());
       List<String> notYetReadIds = LIST_CODER.decode(inStream, context);
-      return new PubsubCheckpoint<>(path, null, null, notYetReadIds);
+      return new PubsubCheckpoint(path, null, null, notYetReadIds);
     }
   }
 
@@ -392,16 +389,14 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
    * but not yet consumed downstream and/or ACKed back to Pubsub.
    */
   @VisibleForTesting
-  static class PubsubReader<T> extends UnboundedSource.UnboundedReader<T> {
+  static class PubsubReader extends UnboundedSource.UnboundedReader<PubsubIO.PubsubMessage> {
     /**
      * For access to topic and checkpointCoder.
      */
-    private final PubsubSource<T> outer;
+    private final PubsubSource outer;
     @VisibleForTesting
     final SubscriptionPath subscription;
 
-    private final SimpleFunction<PubsubIO.PubsubMessage, T> parseFn;
-
     /**
      * Client on which to talk to Pubsub. Contains a null value if the client has been closed.
      */
@@ -593,12 +588,10 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     /**
      * Construct a reader.
      */
-    public PubsubReader(PubsubOptions options, PubsubSource<T> outer, SubscriptionPath subscription,
-                        SimpleFunction<PubsubIO.PubsubMessage, T> parseFn)
+    public PubsubReader(PubsubOptions options, PubsubSource outer, SubscriptionPath subscription)
         throws IOException, GeneralSecurityException {
       this.outer = outer;
       this.subscription = subscription;
-      this.parseFn = parseFn;
       pubsubClient =
           new AtomicReference<>(
               outer.outer.pubsubFactory.newClient(
@@ -970,20 +963,11 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     }
 
     @Override
-    public T getCurrent() throws NoSuchElementException {
+    public PubsubIO.PubsubMessage getCurrent() throws NoSuchElementException {
       if (current == null) {
         throw new NoSuchElementException();
       }
-      try {
-        if (parseFn != null) {
-          return parseFn.apply(new PubsubIO.PubsubMessage(
-                  current.elementBytes, current.attributes));
-        } else {
-          return CoderUtils.decodeFromByteArray(outer.outer.elementCoder, current.elementBytes);
-        }
-      } catch (CoderException e) {
-        throw new RuntimeException("Unable to decode element from Pubsub message: ", e);
-      }
+      return new PubsubIO.PubsubMessage(current.elementBytes, current.attributes);
     }
 
     @Override
@@ -1031,7 +1015,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     }
 
     @Override
-    public PubsubSource<T> getCurrentSource() {
+    public PubsubSource getCurrentSource() {
       return outer;
     }
 
@@ -1073,7 +1057,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     }
 
     @Override
-    public PubsubCheckpoint<T> getCheckpointMark() {
+    public PubsubCheckpoint getCheckpointMark() {
       int cur = numInFlightCheckpoints.incrementAndGet();
       maxInFlightCheckpoints = Math.max(maxInFlightCheckpoints, cur);
       // It's possible for a checkpoint to be taken but never finalized.
@@ -1087,10 +1071,10 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
       }
       if (outer.subscriptionPath == null) {
         // need to include the subscription in case we resume, as it's not stored in the source.
-        return new PubsubCheckpoint<>(
+        return new PubsubCheckpoint(
             subscription.getPath(), this, snapshotSafeToAckIds, snapshotNotYetReadIds);
       }
-      return new PubsubCheckpoint<>(null, this, snapshotSafeToAckIds, snapshotNotYetReadIds);
+      return new PubsubCheckpoint(null, this, snapshotSafeToAckIds, snapshotNotYetReadIds);
     }
 
     @Override
@@ -1104,28 +1088,28 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
   // ================================================================================
 
   @VisibleForTesting
-  static class PubsubSource<T> extends UnboundedSource<T, PubsubCheckpoint<T>> {
-    public final PubsubUnboundedSource<T> outer;
+  static class PubsubSource extends UnboundedSource<PubsubIO.PubsubMessage, PubsubCheckpoint> {
+    public final PubsubUnboundedSource outer;
     // The subscription to read from.
     @VisibleForTesting
     final SubscriptionPath subscriptionPath;
 
-    public PubsubSource(PubsubUnboundedSource<T> outer) {
+    public PubsubSource(PubsubUnboundedSource outer) {
       this(outer, outer.getSubscription());
     }
 
-    private PubsubSource(PubsubUnboundedSource<T> outer, SubscriptionPath subscriptionPath) {
+    private PubsubSource(PubsubUnboundedSource outer, SubscriptionPath subscriptionPath) {
       this.outer = outer;
       this.subscriptionPath = subscriptionPath;
     }
 
     @Override
-    public List<PubsubSource<T>> split(
+    public List<PubsubSource> split(
         int desiredNumSplits, PipelineOptions options) throws Exception {
-      List<PubsubSource<T>> result = new ArrayList<>(desiredNumSplits);
-      PubsubSource<T> splitSource = this;
+      List<PubsubSource> result = new ArrayList<>(desiredNumSplits);
+      PubsubSource splitSource = this;
       if (subscriptionPath == null) {
-        splitSource = new PubsubSource<>(outer, outer.createRandomSubscription(options));
+        splitSource = new PubsubSource(outer, outer.createRandomSubscription(options));
       }
       for (int i = 0; i < desiredNumSplits * SCALE_OUT; i++) {
         // Since the source is immutable and Pubsub automatically shards we simply
@@ -1136,10 +1120,10 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     }
 
     @Override
-    public PubsubReader<T> createReader(
+    public PubsubReader createReader(
         PipelineOptions options,
-        @Nullable PubsubCheckpoint<T> checkpoint) {
-      PubsubReader<T> reader;
+        @Nullable PubsubCheckpoint checkpoint) {
+      PubsubReader reader;
       SubscriptionPath subscription = subscriptionPath;
       if (subscription == null) {
         if (checkpoint == null) {
@@ -1151,8 +1135,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
         }
       }
       try {
-        reader = new PubsubReader<>(options.as(PubsubOptions.class), this, subscription,
-                outer.parseFn);
+        reader = new PubsubReader(options.as(PubsubOptions.class), this, subscription);
       } catch (GeneralSecurityException | IOException e) {
         throw new RuntimeException("Unable to subscribe to " + subscriptionPath + ": ", e);
       }
@@ -1171,15 +1154,15 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
 
     @Nullable
     @Override
-    public Coder<PubsubCheckpoint<T>> getCheckpointMarkCoder() {
-      @SuppressWarnings("unchecked") PubsubCheckpointCoder<T> typedCoder =
-          (PubsubCheckpointCoder<T>) CHECKPOINT_CODER;
+    public Coder<PubsubCheckpoint> getCheckpointMarkCoder() {
+      @SuppressWarnings("unchecked") PubsubCheckpointCoder typedCoder =
+          (PubsubCheckpointCoder) CHECKPOINT_CODER;
       return typedCoder;
     }
 
     @Override
-    public Coder<T> getDefaultOutputCoder() {
-      return outer.elementCoder;
+    public Coder<PubsubIO.PubsubMessage> getDefaultOutputCoder() {
+      return new PubsubMessageWithAttributesCoder();
     }
 
     @Override
@@ -1198,7 +1181,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
   // StatsFn
   // ================================================================================
 
-  private static class StatsFn<T> extends DoFn<T, T> {
+  private static class StatsFn extends DoFn<PubsubIO.PubsubMessage, PubsubIO.PubsubMessage> {
     private final Counter elementCounter = SourceMetrics.elementsRead();
 
     private final PubsubClientFactory pubsubFactory;
@@ -1292,13 +1275,6 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
   private ValueProvider<SubscriptionPath> subscription;
 
   /**
-   * Coder for elements. Elements are effectively double-encoded: first to a byte array
-   * using this checkpointCoder, then to a base-64 string to conform to Pubsub's payload
-   * conventions.
-   */
-  private final Coder<T> elementCoder;
-
-  /**
    * Pubsub metadata field holding timestamp of each element, or {@literal null} if should use
    * Pubsub message publish timestamp instead.
    */
@@ -1312,12 +1288,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
   @Nullable
   private final String idAttribute;
 
-  /**
-   * If not {@literal null}, the user is asking for PubSub attributes. This parse function will be
-   * used to parse {@link PubsubIO.PubsubMessage}s containing a payload and attributes.
-   */
-  @Nullable
-  SimpleFunction<PubsubIO.PubsubMessage, T> parseFn;
+  /** Whether this source should load the attributes of the PubsubMessage, or only the payload. */
+  private final boolean needsAttributes;
 
   @VisibleForTesting
   PubsubUnboundedSource(
@@ -1326,10 +1298,9 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
       @Nullable ValueProvider<ProjectPath> project,
       @Nullable ValueProvider<TopicPath> topic,
       @Nullable ValueProvider<SubscriptionPath> subscription,
-      Coder<T> elementCoder,
       @Nullable String timestampAttribute,
       @Nullable String idAttribute,
-      @Nullable SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) {
+      boolean needsAttributes) {
     checkArgument((topic == null) != (subscription == null),
                   "Exactly one of topic and subscription must be given");
     checkArgument((topic == null) == (project == null),
@@ -1339,10 +1310,9 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     this.project = project;
     this.topic = topic;
     this.subscription = subscription;
-    this.elementCoder = checkNotNull(elementCoder);
     this.timestampAttribute = timestampAttribute;
     this.idAttribute = idAttribute;
-    this.parseFn = parseFn;
+    this.needsAttributes = needsAttributes;
   }
 
   /**
@@ -1353,27 +1323,18 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
       @Nullable ValueProvider<ProjectPath> project,
       @Nullable ValueProvider<TopicPath> topic,
       @Nullable ValueProvider<SubscriptionPath> subscription,
-      Coder<T> elementCoder,
       @Nullable String timestampAttribute,
       @Nullable String idAttribute,
-      @Nullable SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) {
+      boolean needsAttributes) {
     this(
         null,
         pubsubFactory,
         project,
         topic,
         subscription,
-        elementCoder,
         timestampAttribute,
         idAttribute,
-        parseFn);
-  }
-
-  /**
-   * Get the coder used for elements.
-   */
-  public Coder<T> getElementCoder() {
-    return elementCoder;
+        needsAttributes);
   }
 
   /**
@@ -1432,20 +1393,16 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     return idAttribute;
   }
 
-  /**
-   * Get the parsing function for PubSub attributes.
-   */
-  @Nullable
-  public SimpleFunction<PubsubIO.PubsubMessage, T> getWithAttributesParseFn() {
-    return parseFn;
+  public boolean getNeedsAttributes() {
+    return needsAttributes;
   }
 
   @Override
-  public PCollection<T> expand(PBegin input) {
+  public PCollection<PubsubIO.PubsubMessage> expand(PBegin input) {
     return input.getPipeline().begin()
-                .apply(Read.from(new PubsubSource<T>(this)))
+                .apply(Read.from(new PubsubSource(this)))
                 .apply("PubsubUnboundedSource.Stats",
-                    ParDo.of(new StatsFn<T>(
+                    ParDo.of(new StatsFn(
                         pubsubFactory, subscription, topic, timestampAttribute, idAttribute)));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/25dc94bc/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 5f06b88..8f5d1ea 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -49,19 +49,19 @@ public class PubsubIOTest {
   @Test
   public void testPubsubIOGetName() {
     assertEquals("PubsubIO.Read",
-        PubsubIO.<String>read().fromTopic("projects/myproject/topics/mytopic").getName());
+        PubsubIO.readStrings().fromTopic("projects/myproject/topics/mytopic").getName());
     assertEquals("PubsubIO.Write",
-        PubsubIO.<String>write().to("projects/myproject/topics/mytopic").getName());
+        PubsubIO.writeStrings().to("projects/myproject/topics/mytopic").getName());
   }
 
   @Test
   public void testTopicValidationSuccess() throws Exception {
-    PubsubIO.<String>read().fromTopic("projects/my-project/topics/abc");
-    PubsubIO.<String>read().fromTopic("projects/my-project/topics/ABC");
-    PubsubIO.<String>read().fromTopic("projects/my-project/topics/AbC-DeF");
-    PubsubIO.<String>read().fromTopic("projects/my-project/topics/AbC-1234");
-    PubsubIO.<String>read().fromTopic("projects/my-project/topics/AbC-1234-_.~%+-_.~%+-_.~%+-abc");
-    PubsubIO.<String>read().fromTopic(new StringBuilder()
+    PubsubIO.readStrings().fromTopic("projects/my-project/topics/abc");
+    PubsubIO.readStrings().fromTopic("projects/my-project/topics/ABC");
+    PubsubIO.readStrings().fromTopic("projects/my-project/topics/AbC-DeF");
+    PubsubIO.readStrings().fromTopic("projects/my-project/topics/AbC-1234");
+    PubsubIO.readStrings().fromTopic("projects/my-project/topics/AbC-1234-_.~%+-_.~%+-_.~%+-abc");
+    PubsubIO.readStrings().fromTopic(new StringBuilder()
         .append("projects/my-project/topics/A-really-long-one-")
         .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
         .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
@@ -72,13 +72,13 @@ public class PubsubIOTest {
   @Test
   public void testTopicValidationBadCharacter() throws Exception {
     thrown.expect(IllegalArgumentException.class);
-    PubsubIO.<String>read().fromTopic("projects/my-project/topics/abc-*-abc");
+    PubsubIO.readStrings().fromTopic("projects/my-project/topics/abc-*-abc");
   }
 
   @Test
   public void testTopicValidationTooLong() throws Exception {
     thrown.expect(IllegalArgumentException.class);
-    PubsubIO.<String>read().fromTopic(new StringBuilder().append
+    PubsubIO.readStrings().fromTopic(new StringBuilder().append
         ("projects/my-project/topics/A-really-long-one-")
         .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
         .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
@@ -91,7 +91,7 @@ public class PubsubIOTest {
     String topic = "projects/project/topics/topic";
     String subscription = "projects/project/subscriptions/subscription";
     Duration maxReadTime = Duration.standardMinutes(5);
-    PubsubIO.Read<String> read = PubsubIO.<String>read()
+    PubsubIO.Read<String> read = PubsubIO.readStrings()
         .fromTopic(StaticValueProvider.of(topic))
         .withTimestampAttribute("myTimestamp")
         .withIdAttribute("myId");
@@ -108,7 +108,7 @@ public class PubsubIOTest {
     String topic = "projects/project/topics/topic";
     String subscription = "projects/project/subscriptions/subscription";
     Duration maxReadTime = Duration.standardMinutes(5);
-    PubsubIO.Read<String> read = PubsubIO.<String>read()
+    PubsubIO.Read<String> read = PubsubIO.readStrings()
         .fromSubscription(StaticValueProvider.of(subscription))
         .withTimestampAttribute("myTimestamp")
         .withIdAttribute("myId");
@@ -123,7 +123,7 @@ public class PubsubIOTest {
   @Test
   public void testNullTopic() {
     String subscription = "projects/project/subscriptions/subscription";
-    PubsubIO.Read<String> read = PubsubIO.<String>read()
+    PubsubIO.Read<String> read = PubsubIO.readStrings()
         .fromSubscription(StaticValueProvider.of(subscription));
     assertNull(read.getTopicProvider());
     assertNotNull(read.getSubscriptionProvider());
@@ -133,7 +133,7 @@ public class PubsubIOTest {
   @Test
   public void testNullSubscription() {
     String topic = "projects/project/topics/topic";
-    PubsubIO.Read<String> read = PubsubIO.<String>read()
+    PubsubIO.Read<String> read = PubsubIO.readStrings()
         .fromTopic(StaticValueProvider.of(topic));
     assertNotNull(read.getTopicProvider());
     assertNull(read.getSubscriptionProvider());
@@ -166,7 +166,7 @@ public class PubsubIOTest {
   @Test
   public void testWriteDisplayData() {
     String topic = "projects/project/topics/topic";
-    PubsubIO.Write<?> write = PubsubIO.<String>write()
+    PubsubIO.Write<?> write = PubsubIO.writeStrings()
         .to(topic)
         .withTimestampAttribute("myTimestamp")
         .withIdAttribute("myId");
@@ -182,7 +182,7 @@ public class PubsubIOTest {
   @Category(ValidatesRunner.class)
   public void testPrimitiveWriteDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    PubsubIO.Write<?> write = PubsubIO.<String>write().to("projects/project/topics/topic");
+    PubsubIO.Write<?> write = PubsubIO.writeStrings().to("projects/project/topics/topic");
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
     assertThat("PubsubIO.Write should include the topic in its primitive display data",

http://git-wip-us.apache.org/repos/asf/beam/blob/25dc94bc/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
index dc66ea1..592dfa3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
@@ -34,12 +34,12 @@ import static org.junit.Assert.assertTrue;
 import com.google.api.client.util.Clock;
 import com.google.common.collect.ImmutableList;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
@@ -79,7 +79,7 @@ public class PubsubUnboundedSourceTest {
   private AtomicLong now;
   private Clock clock;
   private PubsubTestClientFactory factory;
-  private PubsubSource<String> primSource;
+  private PubsubSource primSource;
 
   @Rule
   public TestPipeline p = TestPipeline.create();
@@ -93,11 +93,11 @@ public class PubsubUnboundedSourceTest {
       }
     };
     factory = PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S, incoming);
-    PubsubUnboundedSource<String> source =
-        new PubsubUnboundedSource<>(
+    PubsubUnboundedSource source =
+        new PubsubUnboundedSource(
             clock, factory, null, null, StaticValueProvider.of(SUBSCRIPTION),
-            StringUtf8Coder.of(), TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, null);
-    primSource = new PubsubSource<>(source);
+            TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, true /* needsAttributes */);
+    primSource = new PubsubSource(source);
   }
 
   private void setupOneMessage() {
@@ -114,6 +114,10 @@ public class PubsubUnboundedSourceTest {
     factory = null;
   }
 
+  private static String data(PubsubIO.PubsubMessage message) {
+    return new String(message.getMessage(), StandardCharsets.UTF_8);
+  }
+
   @Test
   public void checkpointCoderIsSane() throws Exception {
     setupOneMessage(ImmutableList.<IncomingMessage>of());
@@ -126,13 +130,13 @@ public class PubsubUnboundedSourceTest {
   @Test
   public void readOneMessage() throws IOException {
     setupOneMessage();
-    PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+    PubsubReader reader = primSource.createReader(p.getOptions(), null);
     // Read one message.
     assertTrue(reader.start());
-    assertEquals(DATA, reader.getCurrent());
+    assertEquals(DATA, data(reader.getCurrent()));
     assertFalse(reader.advance());
     // ACK the message.
-    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+    PubsubCheckpoint checkpoint = reader.getCheckpointMark();
     checkpoint.finalizeCheckpoint();
     reader.close();
   }
@@ -140,19 +144,19 @@ public class PubsubUnboundedSourceTest {
   @Test
   public void timeoutAckAndRereadOneMessage() throws IOException {
     setupOneMessage();
-    PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+    PubsubReader reader = primSource.createReader(p.getOptions(), null);
     PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
     assertTrue(reader.start());
-    assertEquals(DATA, reader.getCurrent());
+    assertEquals(DATA, data(reader.getCurrent()));
     // Let the ACK deadline for the above expire.
     now.addAndGet(65 * 1000);
     pubsubClient.advance();
     // We'll now receive the same message again.
     assertTrue(reader.advance());
-    assertEquals(DATA, reader.getCurrent());
+    assertEquals(DATA, data(reader.getCurrent()));
     assertFalse(reader.advance());
     // Now ACK the message.
-    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+    PubsubCheckpoint checkpoint = reader.getCheckpointMark();
     checkpoint.finalizeCheckpoint();
     reader.close();
   }
@@ -160,11 +164,11 @@ public class PubsubUnboundedSourceTest {
   @Test
   public void extendAck() throws IOException {
     setupOneMessage();
-    PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+    PubsubReader reader = primSource.createReader(p.getOptions(), null);
     PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
     // Pull the first message but don't take a checkpoint for it.
     assertTrue(reader.start());
-    assertEquals(DATA, reader.getCurrent());
+    assertEquals(DATA, data(reader.getCurrent()));
     // Extend the ack
     now.addAndGet(55 * 1000);
     pubsubClient.advance();
@@ -174,7 +178,7 @@ public class PubsubUnboundedSourceTest {
     pubsubClient.advance();
     assertFalse(reader.advance());
     // Now ACK the message.
-    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+    PubsubCheckpoint checkpoint = reader.getCheckpointMark();
     checkpoint.finalizeCheckpoint();
     reader.close();
   }
@@ -182,11 +186,11 @@ public class PubsubUnboundedSourceTest {
   @Test
   public void timeoutAckExtensions() throws IOException {
     setupOneMessage();
-    PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+    PubsubReader reader = primSource.createReader(p.getOptions(), null);
     PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
     // Pull the first message but don't take a checkpoint for it.
     assertTrue(reader.start());
-    assertEquals(DATA, reader.getCurrent());
+    assertEquals(DATA, data(reader.getCurrent()));
     // Extend the ack.
     now.addAndGet(55 * 1000);
     pubsubClient.advance();
@@ -202,9 +206,9 @@ public class PubsubUnboundedSourceTest {
     pubsubClient.advance();
     // Reread the same message.
     assertTrue(reader.advance());
-    assertEquals(DATA, reader.getCurrent());
+    assertEquals(DATA, data(reader.getCurrent()));
     // Now ACK the message.
-    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+    PubsubCheckpoint checkpoint = reader.getCheckpointMark();
     checkpoint.finalizeCheckpoint();
     reader.close();
   }
@@ -218,20 +222,20 @@ public class PubsubUnboundedSourceTest {
       incoming.add(new IncomingMessage(data.getBytes(), null, TIMESTAMP, 0, ackid, RECORD_ID));
     }
     setupOneMessage(incoming);
-    PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+    PubsubReader reader = primSource.createReader(p.getOptions(), null);
     // Consume two messages, only read one.
     assertTrue(reader.start());
-    assertEquals("data_0", reader.getCurrent());
+    assertEquals("data_0", data(reader.getCurrent()));
 
     // Grab checkpoint.
-    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+    PubsubCheckpoint checkpoint = reader.getCheckpointMark();
     checkpoint.finalizeCheckpoint();
     assertEquals(1, checkpoint.notYetReadIds.size());
     assertEquals("ackid_1", checkpoint.notYetReadIds.get(0));
 
     // Read second message.
     assertTrue(reader.advance());
-    assertEquals("data_1", reader.getCurrent());
+    assertEquals("data_1", data(reader.getCurrent()));
 
     // Restore from checkpoint.
     byte[] checkpointBytes =
@@ -244,7 +248,7 @@ public class PubsubUnboundedSourceTest {
     // Re-read second message.
     reader = primSource.createReader(p.getOptions(), checkpoint);
     assertTrue(reader.start());
-    assertEquals("data_1", reader.getCurrent());
+    assertEquals("data_1", data(reader.getCurrent()));
 
     // We are done.
     assertFalse(reader.advance());
@@ -278,7 +282,7 @@ public class PubsubUnboundedSourceTest {
     }
     setupOneMessage(incoming);
 
-    PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+    PubsubReader reader = primSource.createReader(p.getOptions(), null);
     PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
 
     for (int i = 0; i < n; i++) {
@@ -290,7 +294,7 @@ public class PubsubUnboundedSourceTest {
       // We'll checkpoint and ack within the 2min limit.
       now.addAndGet(30);
       pubsubClient.advance();
-      String data = reader.getCurrent();
+      String data = data(reader.getCurrent());
       Integer messageNum = dataToMessageNum.remove(data);
       // No duplicate messages.
       assertNotNull(messageNum);
@@ -310,7 +314,7 @@ public class PubsubUnboundedSourceTest {
         }
         assertThat(watermark, lessThanOrEqualTo(minOutstandingTimestamp));
         // Ack messages, but only every other finalization.
-        PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+        PubsubCheckpoint checkpoint = reader.getCheckpointMark();
         if (i % 2000 == 1999) {
           checkpoint.finalizeCheckpoint();
         }
@@ -327,26 +331,25 @@ public class PubsubUnboundedSourceTest {
   public void noSubscriptionSplitGeneratesSubscription() throws Exception {
     TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic");
     factory = PubsubTestClient.createFactoryForCreateSubscription();
-    PubsubUnboundedSource<String> source =
-        new PubsubUnboundedSource<>(
+    PubsubUnboundedSource source =
+        new PubsubUnboundedSource(
             factory,
             StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")),
             StaticValueProvider.of(topicPath),
-            null,
-            StringUtf8Coder.of(),
-            null,
-            null,
-            null);
+            null /* subscription */,
+            null /* timestampLabel */,
+            null /* idLabel */,
+            false /* needsAttributes */);
     assertThat(source.getSubscription(), nullValue());
 
     assertThat(source.getSubscription(), nullValue());
 
     PipelineOptions options = PipelineOptionsFactory.create();
-    List<PubsubSource<String>> splits =
-        (new PubsubSource<>(source)).split(3, options);
+    List<PubsubSource> splits =
+        (new PubsubSource(source)).split(3, options);
     // We have at least one returned split
     assertThat(splits, hasSize(greaterThan(0)));
-    for (PubsubSource<String> split : splits) {
+    for (PubsubSource split : splits) {
       // Each split is equal
       assertThat(split, equalTo(splits.get(0)));
     }
@@ -358,37 +361,36 @@ public class PubsubUnboundedSourceTest {
   public void noSubscriptionNoSplitGeneratesSubscription() throws Exception {
     TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic");
     factory = PubsubTestClient.createFactoryForCreateSubscription();
-    PubsubUnboundedSource<String> source =
-        new PubsubUnboundedSource<>(
+    PubsubUnboundedSource source =
+        new PubsubUnboundedSource(
             factory,
             StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")),
             StaticValueProvider.of(topicPath),
-            null,
-            StringUtf8Coder.of(),
-            null,
-            null,
-            null);
+            null /* subscription */,
+            null /* timestampLabel */,
+            null /* idLabel */,
+            false /* needsAttributes */);
     assertThat(source.getSubscription(), nullValue());
 
     assertThat(source.getSubscription(), nullValue());
 
     PipelineOptions options = PipelineOptionsFactory.create();
-    PubsubSource<String> actualSource = new PubsubSource<>(source);
-    PubsubReader<String> reader = actualSource.createReader(options, null);
+    PubsubSource actualSource = new PubsubSource(source);
+    PubsubReader reader = actualSource.createReader(options, null);
     SubscriptionPath createdSubscription = reader.subscription;
     assertThat(createdSubscription, not(nullValue()));
 
-    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+    PubsubCheckpoint checkpoint = reader.getCheckpointMark();
     assertThat(checkpoint.subscriptionPath, equalTo(createdSubscription.getPath()));
 
     checkpoint.finalizeCheckpoint();
-    PubsubCheckpoint<String> deserCheckpoint =
+    PubsubCheckpoint deserCheckpoint =
         CoderUtils.clone(actualSource.getCheckpointMarkCoder(), checkpoint);
     assertThat(checkpoint.subscriptionPath, not(nullValue()));
     assertThat(checkpoint.subscriptionPath, equalTo(deserCheckpoint.subscriptionPath));
 
-    PubsubReader<String> readerFromOriginal = actualSource.createReader(options, checkpoint);
-    PubsubReader<String> readerFromDeser = actualSource.createReader(options, deserCheckpoint);
+    PubsubReader readerFromOriginal = actualSource.createReader(options, checkpoint);
+    PubsubReader readerFromDeser = actualSource.createReader(options, deserCheckpoint);
 
     assertThat(readerFromOriginal.subscription, equalTo(createdSubscription));
     assertThat(readerFromDeser.subscription, equalTo(createdSubscription));
@@ -400,9 +402,9 @@ public class PubsubUnboundedSourceTest {
   @Test
   public void closeWithActiveCheckpoints() throws Exception {
     setupOneMessage();
-    PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+    PubsubReader reader = primSource.createReader(p.getOptions(), null);
     reader.start();
-    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+    PubsubCheckpoint checkpoint = reader.getCheckpointMark();
     reader.close();
     checkpoint.finalizeCheckpoint();
   }