You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2021/04/08 23:03:59 UTC

[beam] branch master updated: Change PubSubSource and PubSubSink translation to avoid special transform overrides.

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c472530  Change PubSubSource and PubSubSink translation to avoid special transform overrides.
     new a48abeb  Merge pull request #14384 from [BEAM-10861] Change PubSubSource and PubSubSink translation to avoid special transform overrides
c472530 is described below

commit c4725301da8f1fbc3982bca986f4d9a1b9a4ce19
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Tue Mar 30 18:00:59 2021 -0700

    Change PubSubSource and PubSubSink translation to avoid special transform overrides.
---
 .../pipeline/src/main/proto/beam_runner_api.proto  |   4 +
 .../beam/runners/dataflow/DataflowRunner.java      | 180 +-------------------
 sdks/java/io/google-cloud-platform/build.gradle    |   3 +-
 .../io/gcp/pubsub/PubSubPayloadTranslation.java    | 159 +++++++++++++++++
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java     |  78 ++++++---
 .../sdk/io/gcp/pubsub/PubsubUnboundedSource.java   |  73 +++++---
 .../sdk/io/gcp/pubsub/RunnerImplementedSink.java   |  68 --------
 .../pubsub/RunnerImplementedSinkTranslation.java   |  87 ----------
 .../sdk/io/gcp/pubsub/RunnerImplementedSource.java |  83 ---------
 .../pubsub/RunnerImplementedSourceTranslation.java | 102 -----------
 ....java => PubSubReadPayloadTranslationTest.java} | 189 +++++++++++----------
 ...java => PubSubWritePayloadTranslationTest.java} |  37 ++--
 .../sdk/io/gcp/pubsub/PubsubIOExternalTest.java    |  12 +-
 .../io/gcp/pubsub/PubsubUnboundedSourceTest.java   |  80 +++++++--
 14 files changed, 450 insertions(+), 705 deletions(-)

diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index 134fcb6..138e352 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -696,6 +696,8 @@ message WriteFilesPayload {
 // with a native implementation.
 // The SDK should guarantee that only one of topic, subscription,
 // topic_runtime_overridden and subscription_runtime_overridden is set.
+// The output of PubSubReadPayload should be bytes of serialized PubsubMessage
+// proto if with_attributes == true. Otherwise, the bytes is the raw payload.
 message PubSubReadPayload {
 
   // Topic to read from. Exactly one of topic or subscription should be set.
@@ -727,6 +729,8 @@ message PubSubReadPayload {
 // with a native implementation.
 // The SDK should guarantee that only one of topic and topic_runtime_overridden
 // is set.
+// The output of PubSubWritePayload should be bytes if serialized PubsubMessage
+// proto.
 message PubSubWritePayload {
 
   // Topic to write to.
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index a06951f..cdb7e67 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -95,7 +95,6 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -113,12 +112,8 @@ import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageIdCoder;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages.DeserializeBytesIntoPubsubMessagePayloadOnly;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
-import org.apache.beam.sdk.io.gcp.pubsub.RunnerImplementedSink;
-import org.apache.beam.sdk.io.gcp.pubsub.RunnerImplementedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -137,11 +132,9 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupIntoBatches;
 import org.apache.beam.sdk.transforms.Impulse;
-import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Reshuffle;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
@@ -165,7 +158,6 @@ import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.beam.sdk.values.ValueWithRecordId;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -207,7 +199,6 @@ import org.slf4j.LoggerFactory;
 public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
   private static final Logger LOG = LoggerFactory.getLogger(DataflowRunner.class);
-
   /** Provided configuration options. */
   private final DataflowPipelineOptions options;
 
@@ -455,27 +446,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>();
   }
 
-  /** For portable jobs, Dataflow still requires an override of the PubsubIO transforms. */
-  private List<PTransformOverride> getPortableOverrides() {
-    ImmutableList.Builder<PTransformOverride> overridesBuilder = ImmutableList.builder();
-
-    if (!hasExperiment(options, "enable_custom_pubsub_source")) {
-      overridesBuilder.add(
-          PTransformOverride.of(
-              PTransformMatchers.classEqualTo(PubsubUnboundedSource.class),
-              new DataflowReadFromPubsubSourceForRunnerV2OverrideFactory()));
-    }
-
-    if (!hasExperiment(options, "enable_custom_pubsub_sink")) {
-      overridesBuilder.add(
-          PTransformOverride.of(
-              PTransformMatchers.classEqualTo(PubsubUnboundedSink.class),
-              new DataflowWriteToPubsubRunnerV2OverrideFactory()));
-    }
-
-    return overridesBuilder.build();
-  }
-
   private List<PTransformOverride> getOverrides(boolean streaming) {
     boolean fnApiEnabled = useUnifiedWorker(options);
     ImmutableList.Builder<PTransformOverride> overridesBuilder = ImmutableList.builder();
@@ -506,12 +476,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
     if (streaming) {
       if (!hasExperiment(options, "enable_custom_pubsub_source")) {
-        if (useUnifiedWorker(options)) {
-          overridesBuilder.add(
-              PTransformOverride.of(
-                  PTransformMatchers.classEqualTo(PubsubUnboundedSource.class),
-                  new DataflowReadFromPubsubSourceForRunnerV2OverrideFactory()));
-        } else {
+        if (!useUnifiedWorker(options)) {
           overridesBuilder.add(
               PTransformOverride.of(
                   PTransformMatchers.classEqualTo(PubsubUnboundedSource.class),
@@ -519,12 +484,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
         }
       }
       if (!hasExperiment(options, "enable_custom_pubsub_sink")) {
-        if (useUnifiedWorker(options)) {
-          overridesBuilder.add(
-              PTransformOverride.of(
-                  PTransformMatchers.classEqualTo(PubsubUnboundedSink.class),
-                  new DataflowWriteToPubsubRunnerV2OverrideFactory()));
-        } else {
+        if (!useUnifiedWorker(options)) {
           overridesBuilder.add(
               PTransformOverride.of(
                   PTransformMatchers.classEqualTo(PubsubUnboundedSink.class),
@@ -989,9 +949,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
             .addAllCapabilities(Environments.getJavaCapabilities())
             .build());
 
-    if (useUnifiedWorker(options)) {
-      pipeline.replaceAll(getPortableOverrides());
-    }
     RunnerApi.Pipeline portablePipelineProto =
         PipelineTranslation.toProto(pipeline, portableComponents, false);
     LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto));
@@ -1495,25 +1452,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
   }
 
-  private static class DataflowReadFromPubsubSourceForRunnerV2OverrideFactory
-      implements PTransformOverrideFactory<
-          PBegin, PCollection<PubsubMessage>, PubsubUnboundedSource> {
-
-    @Override
-    public PTransformReplacement<PBegin, PCollection<PubsubMessage>> getReplacementTransform(
-        AppliedPTransform<PBegin, PCollection<PubsubMessage>, PubsubUnboundedSource> transform) {
-      return PTransformReplacement.of(
-          transform.getPipeline().begin(),
-          new DataflowReadFromPubsubForRunnerV2(transform.getTransform()));
-    }
-
-    @Override
-    public Map<PCollection<?>, ReplacementOutput> mapOutputs(
-        Map<TupleTag<?>, PCollection<?>> outputs, PCollection<PubsubMessage> newOutput) {
-      return ReplacementOutputs.singleton(outputs, newOutput);
-    }
-  }
-
   /**
    * Suppress application of {@link PubsubUnboundedSource#expand} in streaming mode so that we can
    * instead defer to Windmill's implementation.
@@ -1552,38 +1490,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
   }
 
-  public static class DataflowReadFromPubsubForRunnerV2
-      extends PTransform<PBegin, PCollection<PubsubMessage>> {
-
-    private final PubsubUnboundedSource transform;
-
-    public DataflowReadFromPubsubForRunnerV2(PubsubUnboundedSource transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PCollection<PubsubMessage> expand(PBegin input) {
-      PCollection<byte[]> outputFromRunner =
-          input.apply(
-              "DataflowReadFromPubsubRunnerV2", new RunnerImplementedSource(this.transform));
-
-      SerializableFunction<byte[], PubsubMessage> function;
-      if (transform.getNeedsAttributes() || transform.getNeedsMessageId()) {
-        function = new PubsubMessages.ParsePubsubMessageProtoAsPayload();
-      } else {
-        function = new DeserializeBytesIntoPubsubMessagePayloadOnly();
-      }
-      return outputFromRunner.apply(
-          "MapBytesToPubsubMessages",
-          MapElements.into(TypeDescriptor.of(PubsubMessage.class)).via(function));
-    }
-
-    static {
-      DataflowPipelineTranslator.registerTransformTranslator(
-          RunnerImplementedSource.class, new DataflowPubsubSourceRunnerV2Translator());
-    }
-  }
-
   private static void translateOverriddenPubsubSourceStep(
       PubsubUnboundedSource overriddenTransform, StepTranslationContext stepTranslationContext) {
     stepTranslationContext.addInput(PropertyNames.FORMAT, "pubsub");
@@ -1642,20 +1548,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
   }
 
-  private static class DataflowPubsubSourceRunnerV2Translator
-      implements TransformTranslator<RunnerImplementedSource> {
-
-    @Override
-    public void translate(RunnerImplementedSource transform, TranslationContext context) {
-      checkArgument(
-          context.getPipelineOptions().isStreaming(),
-          "DataflowPubsubSourceRunnerV2Translator is only for streaming pipelines.");
-      StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
-      translateOverriddenPubsubSourceStep(transform.getOverriddenSource(), stepContext);
-      stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-    }
-  }
-
   private static class IdentityMessageFn extends SimpleFunction<PubsubMessage, PubsubMessage> {
 
     @Override
@@ -1715,20 +1607,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       }
     }
 
-    /** Rewrite {@link RunnerImplementedSink} to the appropriate internal node. */
-    static class DataflowRunnerV2PubsubSinkTranslator
-        implements TransformTranslator<RunnerImplementedSink> {
-      @Override
-      public void translate(RunnerImplementedSink transform, TranslationContext context) {
-        checkArgument(
-            context.getPipelineOptions().isStreaming(),
-            "DataflowRunnerV2PubsubSink is only for streaming pipelines.");
-        StepTranslationContext stepContext = context.addStep(transform, "ParallelWrite");
-        StreamingPubsubSinkTranslators.translate(
-            transform.getOverrideSink(), stepContext, context.getInput(transform));
-      }
-    }
-
     private static void translate(
         PubsubUnboundedSink overriddenTransform,
         StepTranslationContext stepContext,
@@ -2206,60 +2084,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
   }
 
-  /**
-   * A replacement {@link PTransform} for {@link PubsubUnboundedSink} when using dataflow runner v2.
-   */
-  private static class DataflowWriteToPubsubForRunnerV2
-      extends PTransform<PCollection<PubsubMessage>, PDone> {
-
-    private final PubsubUnboundedSink transform;
-
-    public DataflowWriteToPubsubForRunnerV2(PubsubUnboundedSink transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PDone expand(PCollection<PubsubMessage> input) {
-      input
-          .apply(
-              "Output Serialized PubsubMessage Proto",
-              MapElements.into(new TypeDescriptor<byte[]>() {})
-                  .via(new PubsubMessages.ParsePayloadAsPubsubMessageProto()))
-          .setCoder(ByteArrayCoder.of())
-          .apply("Write to Runner Implemented Sink", new RunnerImplementedSink(transform));
-
-      return PDone.in(input.getPipeline());
-    }
-
-    static {
-      DataflowPipelineTranslator.registerTransformTranslator(
-          RunnerImplementedSink.class,
-          new StreamingPubsubSinkTranslators.DataflowRunnerV2PubsubSinkTranslator());
-    }
-  }
-
-  /**
-   * A {@link PTransformOverrideFactory} to provide replacement {@link PTransform} for {@link
-   * PubsubUnboundedSink} when using dataflow runner v2.
-   */
-  private static class DataflowWriteToPubsubRunnerV2OverrideFactory
-      implements PTransformOverrideFactory<PCollection<PubsubMessage>, PDone, PubsubUnboundedSink> {
-
-    @Override
-    public PTransformReplacement<PCollection<PubsubMessage>, PDone> getReplacementTransform(
-        AppliedPTransform<PCollection<PubsubMessage>, PDone, PubsubUnboundedSink> transform) {
-      return PTransformReplacement.of(
-          PTransformReplacements.getSingletonMainInput(transform),
-          new DataflowWriteToPubsubForRunnerV2(transform.getTransform()));
-    }
-
-    @Override
-    public Map<PCollection<?>, ReplacementOutput> mapOutputs(
-        Map<TupleTag<?>, PCollection<?>> outputs, PDone newOutput) {
-      return Collections.emptyMap();
-    }
-  }
-
   @VisibleForTesting
   static class StreamingShardedWriteFactory<UserT, DestinationT, OutputT>
       implements PTransformOverrideFactory<
diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle
index 4b25f56..579a352 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -23,8 +23,7 @@ applyJavaNature(
   automaticModuleName: 'org.apache.beam.sdk.io.gcp',
   enableSpotbugs: false,
   classesTriggerCheckerBugs: [
-    'RunnerImplementedSourceTranslation': 'https://github.com/typetools/checker-framework/issues/3791',
-    'RunnerImplementedSinkTranslation': 'https://github.com/typetools/checker-framework/issues/3791',
+    'PubSubPayloadTranslation': 'https://github.com/typetools/checker-framework/issues/3791',
   ],
 )
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java
new file mode 100644
index 0000000..a1a2675
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PubSubReadPayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PubSubWritePayload;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.runners.core.construction.SdkComponents;
+import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Read.Unbounded;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.PubsubSource;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+
+@SuppressWarnings({
+  "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+/**
+ * Utility methods for translating a {@link Unbounded} which reads from {@link
+ * PubsubUnboundedSource} to {@link RunnerApi} representations.
+ */
+public class PubSubPayloadTranslation {
+  static class PubSubReadPayloadTranslator
+      implements TransformPayloadTranslator<Read.Unbounded<?>> {
+
+    @Override
+    public String getUrn(Read.Unbounded<?> transform) {
+      if (!(transform.getSource() instanceof PubsubUnboundedSource.PubsubSource)) {
+        return null;
+      }
+      return PTransformTranslation.PUBSUB_READ;
+    }
+
+    @Override
+    public RunnerApi.FunctionSpec translate(
+        AppliedPTransform<?, ?, Unbounded<?>> transform, SdkComponents components) {
+      if (!(transform.getTransform().getSource() instanceof PubsubUnboundedSource.PubsubSource)) {
+        return null;
+      }
+      PubSubReadPayload.Builder payloadBuilder = PubSubReadPayload.newBuilder();
+      PubsubUnboundedSource pubsubUnboundedSource =
+          ((PubsubSource) transform.getTransform().getSource()).outer;
+      ValueProvider<TopicPath> topicProvider = pubsubUnboundedSource.getTopicProvider();
+      if (topicProvider != null) {
+        if (topicProvider.isAccessible()) {
+          payloadBuilder.setTopic(topicProvider.get().getFullPath());
+        } else {
+          payloadBuilder.setTopicRuntimeOverridden(
+              ((NestedValueProvider) topicProvider).propertyName());
+        }
+      }
+      ValueProvider<SubscriptionPath> subscriptionProvider =
+          pubsubUnboundedSource.getSubscriptionProvider();
+      if (subscriptionProvider != null) {
+        if (subscriptionProvider.isAccessible()) {
+          payloadBuilder.setSubscription(subscriptionProvider.get().getFullPath());
+        } else {
+          payloadBuilder.setSubscriptionRuntimeOverridden(
+              ((NestedValueProvider) subscriptionProvider).propertyName());
+        }
+      }
+
+      if (pubsubUnboundedSource.getTimestampAttribute() != null) {
+        payloadBuilder.setTimestampAttribute(pubsubUnboundedSource.getTimestampAttribute());
+      }
+      if (pubsubUnboundedSource.getIdAttribute() != null) {
+        payloadBuilder.setIdAttribute(pubsubUnboundedSource.getIdAttribute());
+      }
+      payloadBuilder.setWithAttributes(
+          pubsubUnboundedSource.getNeedsAttributes() || pubsubUnboundedSource.getNeedsMessageId());
+      return FunctionSpec.newBuilder()
+          .setUrn(getUrn(transform.getTransform()))
+          .setPayload(payloadBuilder.build().toByteString())
+          .build();
+    }
+  }
+
+  static class PubSubWritePayloadTranslator
+      implements TransformPayloadTranslator<PubsubUnboundedSink.PubsubSink> {
+
+    @Override
+    public String getUrn(PubsubUnboundedSink.PubsubSink transform) {
+      return PTransformTranslation.PUBSUB_WRITE;
+    }
+
+    @Override
+    public RunnerApi.FunctionSpec translate(
+        AppliedPTransform<?, ?, PubsubUnboundedSink.PubsubSink> transform,
+        SdkComponents components) {
+      PubSubWritePayload.Builder payloadBuilder = PubSubWritePayload.newBuilder();
+      ValueProvider<TopicPath> topicProvider = transform.getTransform().outer.getTopicProvider();
+      if (topicProvider.isAccessible()) {
+        payloadBuilder.setTopic(topicProvider.get().getFullPath());
+      } else {
+        payloadBuilder.setTopicRuntimeOverridden(
+            ((NestedValueProvider) topicProvider).propertyName());
+      }
+      if (transform.getTransform().outer.getTimestampAttribute() != null) {
+        payloadBuilder.setTimestampAttribute(
+            transform.getTransform().outer.getTimestampAttribute());
+      }
+      if (transform.getTransform().outer.getIdAttribute() != null) {
+        payloadBuilder.setIdAttribute(transform.getTransform().outer.getIdAttribute());
+      }
+      return FunctionSpec.newBuilder()
+          .setUrn(getUrn(transform.getTransform()))
+          .setPayload(payloadBuilder.build().toByteString())
+          .build();
+    }
+  }
+
+  @AutoService(TransformPayloadTranslatorRegistrar.class)
+  public static class WriteRegistrar implements TransformPayloadTranslatorRegistrar {
+
+    @Override
+    public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+        getTransformPayloadTranslators() {
+      return Collections.singletonMap(
+          PubsubUnboundedSink.PubsubSink.class, new PubSubWritePayloadTranslator());
+    }
+  }
+
+  @AutoService(TransformPayloadTranslatorRegistrar.class)
+  public static class ReadRegistrar implements TransformPayloadTranslatorRegistrar {
+
+    @Override
+    public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+        getTransformPayloadTranslators() {
+      return Collections.singletonMap(Read.Unbounded.class, new PubSubReadPayloadTranslator());
+    }
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index 5c9fca3..e6ce089 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -29,6 +29,7 @@ import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -46,6 +47,7 @@ import org.apache.beam.sdk.metrics.SinkMetrics;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -59,6 +61,7 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -141,7 +144,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
   // ================================================================================
 
   /** Convert elements to messages and shard them. */
-  private static class ShardFn extends DoFn<PubsubMessage, KV<Integer, OutgoingMessage>> {
+  private static class ShardFn extends DoFn<byte[], KV<Integer, OutgoingMessage>> {
     private final Counter elementCounter = Metrics.counter(ShardFn.class, "elements");
     private final int numShards;
     private final RecordIdMethod recordIdMethod;
@@ -154,8 +157,9 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       elementCounter.inc();
-      PubsubMessage message = c.element();
-      byte[] elementBytes = message.getPayload();
+      com.google.pubsub.v1.PubsubMessage message =
+          com.google.pubsub.v1.PubsubMessage.parseFrom(c.element());
+      byte[] elementBytes = message.getData().toByteArray();
 
       long timestampMsSinceEpoch = c.timestamp().getMillis();
       @Nullable String recordId = null;
@@ -409,29 +413,51 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
 
   @Override
   public PDone expand(PCollection<PubsubMessage> input) {
-    input
-        .apply(
-            "PubsubUnboundedSink.Window",
-            Window.<PubsubMessage>into(new GlobalWindows())
-                .triggering(
-                    Repeatedly.forever(
-                        AfterFirst.of(
-                            AfterPane.elementCountAtLeast(publishBatchSize),
-                            AfterProcessingTime.pastFirstElementInPane().plusDelayOf(maxLatency))))
-                .discardingFiredPanes())
-        .apply("PubsubUnboundedSink.Shard", ParDo.of(new ShardFn(numShards, recordIdMethod)))
-        .setCoder(KvCoder.of(VarIntCoder.of(), CODER))
-        .apply(GroupByKey.create())
+    return input
         .apply(
-            "PubsubUnboundedSink.Writer",
-            ParDo.of(
-                new WriterFn(
-                    pubsubFactory,
-                    topic,
-                    timestampAttribute,
-                    idAttribute,
-                    publishBatchSize,
-                    publishBatchBytes)));
-    return PDone.in(input.getPipeline());
+            "Output Serialized PubsubMessage Proto",
+            MapElements.into(new TypeDescriptor<byte[]>() {})
+                .via(new PubsubMessages.ParsePayloadAsPubsubMessageProto()))
+        .setCoder(ByteArrayCoder.of())
+        .apply(new PubsubSink(this));
+  }
+
+  static class PubsubSink extends PTransform<PCollection<byte[]>, PDone> {
+    public final PubsubUnboundedSink outer;
+
+    PubsubSink(PubsubUnboundedSink outer) {
+      this.outer = outer;
+    }
+
+    @Override
+    public PDone expand(PCollection<byte[]> input) {
+      input
+          .apply(
+              "PubsubUnboundedSink.Window",
+              Window.<byte[]>into(new GlobalWindows())
+                  .triggering(
+                      Repeatedly.forever(
+                          AfterFirst.of(
+                              AfterPane.elementCountAtLeast(outer.publishBatchSize),
+                              AfterProcessingTime.pastFirstElementInPane()
+                                  .plusDelayOf(outer.maxLatency))))
+                  .discardingFiredPanes())
+          .apply(
+              "PubsubUnboundedSink.Shard",
+              ParDo.of(new ShardFn(outer.numShards, outer.recordIdMethod)))
+          .setCoder(KvCoder.of(VarIntCoder.of(), CODER))
+          .apply(GroupByKey.create())
+          .apply(
+              "PubsubUnboundedSink.Writer",
+              ParDo.of(
+                  new WriterFn(
+                      outer.pubsubFactory,
+                      outer.topic,
+                      outer.timestampAttribute,
+                      outer.idAttribute,
+                      outer.publishBatchSize,
+                      outer.publishBatchBytes)));
+      return PDone.in(input.getPipeline());
+    }
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index 37d0e94..bd5e868 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
@@ -53,15 +54,19 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages.DeserializeBytesIntoPubsubMessagePayloadOnly;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.SourceMetrics;
+import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
@@ -70,6 +75,7 @@ import org.apache.beam.sdk.util.BucketingFunction;
 import org.apache.beam.sdk.util.MovingFunction;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
@@ -358,7 +364,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
    * consumed downstream and/or ACKed back to Pubsub.
    */
   @VisibleForTesting
-  static class PubsubReader extends UnboundedSource.UnboundedReader<PubsubMessage> {
+  static class PubsubReader extends UnboundedSource.UnboundedReader<byte[]> {
     /** For access to topic and checkpointCoder. */
     private final PubsubSource outer;
 
@@ -882,14 +888,14 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
     }
 
     @Override
-    public PubsubMessage getCurrent() throws NoSuchElementException {
+    public byte[] getCurrent() throws NoSuchElementException {
       if (current == null) {
         throw new NoSuchElementException();
       }
-      return new PubsubMessage(
-          current.message().getData().toByteArray(),
-          current.message().getAttributesMap(),
-          current.recordId());
+      if (this.outer.outer.getNeedsMessageId() || this.outer.outer.getNeedsAttributes()) {
+        return current.message().toByteArray();
+      }
+      return current.message().getData().toByteArray();
     }
 
     @Override
@@ -1010,7 +1016,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
   // ================================================================================
 
   @VisibleForTesting
-  static class PubsubSource extends UnboundedSource<PubsubMessage, PubsubCheckpoint> {
+  static class PubsubSource extends UnboundedSource<byte[], PubsubCheckpoint> {
     public final PubsubUnboundedSource outer;
     // The subscription to read from.
     @VisibleForTesting final ValueProvider<SubscriptionPath> subscriptionPath;
@@ -1086,16 +1092,8 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
     }
 
     @Override
-    public Coder<PubsubMessage> getOutputCoder() {
-      if (outer.getNeedsMessageId()) {
-        return outer.getNeedsAttributes()
-            ? PubsubMessageWithAttributesAndMessageIdCoder.of()
-            : PubsubMessageWithMessageIdCoder.of();
-      } else {
-        return outer.getNeedsAttributes()
-            ? PubsubMessageWithAttributesCoder.of()
-            : PubsubMessagePayloadOnlyCoder.of();
-      }
+    public Coder<byte[]> getOutputCoder() {
+      return ByteArrayCoder.of();
     }
 
     @Override
@@ -1336,14 +1334,39 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
 
   @Override
   public PCollection<PubsubMessage> expand(PBegin input) {
-    return input
-        .getPipeline()
-        .begin()
-        .apply(Read.from(new PubsubSource(this)))
-        .apply(
-            "PubsubUnboundedSource.Stats",
-            ParDo.of(
-                new StatsFn(pubsubFactory, subscription, topic, timestampAttribute, idAttribute)));
+    SerializableFunction<byte[], PubsubMessage> function;
+    if (getNeedsAttributes() || getNeedsMessageId()) {
+      function = new PubsubMessages.ParsePubsubMessageProtoAsPayload();
+    } else {
+      function = new DeserializeBytesIntoPubsubMessagePayloadOnly();
+    }
+    PCollection<PubsubMessage> messages =
+        input
+            .getPipeline()
+            .begin()
+            .apply(Read.from(new PubsubSource(this)))
+            .apply(
+                "MapBytesToPubsubMessages",
+                MapElements.into(TypeDescriptor.of(PubsubMessage.class)).via(function));
+    if (usesStatsFn(input.getPipeline().getOptions())) {
+      messages =
+          messages.apply(
+              "PubsubUnboundedSource.Stats",
+              ParDo.of(
+                  new StatsFn(
+                      pubsubFactory, subscription, topic, timestampAttribute, idAttribute)));
+    }
+    return messages;
+  }
+
+  private boolean usesStatsFn(PipelineOptions options) {
+    if (ExperimentalOptions.hasExperiment(options, "enable_custom_pubsub_source")) {
+      return true;
+    }
+    if (!options.getRunner().getName().startsWith("org.apache.beam.runners.dataflow.")) {
+      return true;
+    }
+    return false;
   }
 
   private SubscriptionPath createRandomSubscription(PipelineOptions options) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSink.java
deleted file mode 100644
index 6c70b50..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSink.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsub;
-
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-@SuppressWarnings({
-  "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-/**
- * A {@link PTransform} which represents a runner implemented Pubsub sink. {@link
- * RunnerImplementedSinkTranslator} will translate this transform into well-known composite.
- */
-@Internal
-public class RunnerImplementedSink extends PTransform<PCollection<byte[]>, PDone> {
-  private final PubsubUnboundedSink sink;
-
-  public RunnerImplementedSink(PubsubUnboundedSink sink) {
-    this.sink = sink;
-  }
-
-  public PubsubUnboundedSink getOverrideSink() {
-    return sink;
-  }
-
-  public ValueProvider<TopicPath> getTopicProvider() {
-    return sink.getTopicProvider();
-  }
-
-  public String getTimestampAttribute() {
-    return sink.getTimestampAttribute();
-  }
-
-  public String getIdAttribute() {
-    return sink.getIdAttribute();
-  }
-
-  @Override
-  public PDone expand(PCollection<byte[]> input) {
-    return PDone.in(input.getPipeline());
-  }
-
-  @Override
-  protected String getKindString() {
-    return "RunnerImplementedSink";
-  }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSinkTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSinkTranslation.java
deleted file mode 100644
index 056871f..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSinkTranslation.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsub;
-
-import com.google.auto.service.AutoService;
-import java.util.Collections;
-import java.util.Map;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
-import org.apache.beam.model.pipeline.v1.RunnerApi.PubSubWritePayload;
-import org.apache.beam.runners.core.construction.PTransformTranslation;
-import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
-import org.apache.beam.runners.core.construction.SdkComponents;
-import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-
-@SuppressWarnings({
-  "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-/**
- * Utility methods for translating a {@link RunnerImplementedSink} to {@link RunnerApi}
- * representations.
- */
-public class RunnerImplementedSinkTranslation {
-  static class RunnerImplementedSinkTranslator
-      implements TransformPayloadTranslator<RunnerImplementedSink> {
-
-    @Override
-    public String getUrn(RunnerImplementedSink transform) {
-      return PTransformTranslation.PUBSUB_WRITE;
-    }
-
-    @Override
-    public RunnerApi.FunctionSpec translate(
-        AppliedPTransform<?, ?, RunnerImplementedSink> transform, SdkComponents components) {
-      PubSubWritePayload.Builder payloadBuilder = PubSubWritePayload.newBuilder();
-      ValueProvider<TopicPath> topicProvider = transform.getTransform().getTopicProvider();
-      if (topicProvider.isAccessible()) {
-        payloadBuilder.setTopic(topicProvider.get().getFullPath());
-      } else {
-        payloadBuilder.setTopicRuntimeOverridden(
-            ((NestedValueProvider) topicProvider).propertyName());
-      }
-      if (transform.getTransform().getTimestampAttribute() != null) {
-        payloadBuilder.setTimestampAttribute(transform.getTransform().getTimestampAttribute());
-      }
-      if (transform.getTransform().getIdAttribute() != null) {
-        payloadBuilder.setIdAttribute(transform.getTransform().getIdAttribute());
-      }
-      return FunctionSpec.newBuilder()
-          .setUrn(getUrn(transform.getTransform()))
-          .setPayload(payloadBuilder.build().toByteString())
-          .build();
-    }
-  }
-
-  @AutoService(TransformPayloadTranslatorRegistrar.class)
-  public static class Registrar implements TransformPayloadTranslatorRegistrar {
-
-    @Override
-    public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
-        getTransformPayloadTranslators() {
-      return Collections.singletonMap(
-          RunnerImplementedSink.class, new RunnerImplementedSinkTranslator());
-    }
-  }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSource.java
deleted file mode 100644
index b1170fc..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSource.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsub;
-
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.WindowingStrategy;
-
-@SuppressWarnings({
-  "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-/**
- * A {@link PTransform} which represents a runner implemented Pubsub source. {@link
- * RunnerImplementedSourceTranslator} will translate this transform into well-known composite.
- */
-@Internal
-public class RunnerImplementedSource extends PTransform<PBegin, PCollection<byte[]>> {
-  private final PubsubUnboundedSource source;
-
-  public RunnerImplementedSource(PubsubUnboundedSource source) {
-    this.source = source;
-  }
-
-  public PubsubUnboundedSource getOverriddenSource() {
-    return source;
-  }
-
-  public ValueProvider<TopicPath> getTopicProvider() {
-    return source.getTopicProvider();
-  }
-
-  public ValueProvider<SubscriptionPath> getSubscriptionProvider() {
-    return source.getSubscriptionProvider();
-  }
-
-  public String getTimestampAttribute() {
-    return source.getTimestampAttribute();
-  }
-
-  public String getIdAttribute() {
-    return source.getIdAttribute();
-  }
-
-  public boolean isWithAttributes() {
-    return source.getNeedsAttributes() || source.getNeedsMessageId();
-  }
-
-  @Override
-  public PCollection<byte[]> expand(PBegin input) {
-    ByteArrayCoder coder = ByteArrayCoder.of();
-    return PCollection.createPrimitiveOutputInternal(
-            input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED, coder)
-        .setCoder(coder);
-  }
-
-  @Override
-  protected String getKindString() {
-    return "RunnerImplementedSource";
-  }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSourceTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSourceTranslation.java
deleted file mode 100644
index 5f002d0..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSourceTranslation.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsub;
-
-import com.google.auto.service.AutoService;
-import java.util.Collections;
-import java.util.Map;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
-import org.apache.beam.model.pipeline.v1.RunnerApi.PubSubReadPayload;
-import org.apache.beam.runners.core.construction.PTransformTranslation;
-import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
-import org.apache.beam.runners.core.construction.SdkComponents;
-import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-
-@SuppressWarnings({
-  "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-/**
- * Utility methods for translating a {@link RunnerImplementedSource} to {@link RunnerApi}
- * representations.
- */
-public class RunnerImplementedSourceTranslation {
-  static class RunnerImplementedSourceTranslator
-      implements TransformPayloadTranslator<RunnerImplementedSource> {
-
-    @Override
-    public String getUrn(RunnerImplementedSource transform) {
-      return PTransformTranslation.PUBSUB_READ;
-    }
-
-    @Override
-    public RunnerApi.FunctionSpec translate(
-        AppliedPTransform<?, ?, RunnerImplementedSource> transform, SdkComponents components) {
-      PubSubReadPayload.Builder payloadBuilder = PubSubReadPayload.newBuilder();
-      ValueProvider<TopicPath> topicProvider = transform.getTransform().getTopicProvider();
-      if (topicProvider != null) {
-        if (topicProvider.isAccessible()) {
-          payloadBuilder.setTopic(topicProvider.get().getFullPath());
-        } else {
-          payloadBuilder.setTopicRuntimeOverridden(
-              ((NestedValueProvider) topicProvider).propertyName());
-        }
-      }
-      ValueProvider<SubscriptionPath> subscriptionProvider =
-          transform.getTransform().getSubscriptionProvider();
-      if (subscriptionProvider != null) {
-        if (subscriptionProvider.isAccessible()) {
-          payloadBuilder.setSubscription(subscriptionProvider.get().getFullPath());
-        } else {
-          payloadBuilder.setSubscriptionRuntimeOverridden(
-              ((NestedValueProvider) subscriptionProvider).propertyName());
-        }
-      }
-
-      if (transform.getTransform().getTimestampAttribute() != null) {
-        payloadBuilder.setTimestampAttribute(transform.getTransform().getTimestampAttribute());
-      }
-      if (transform.getTransform().getIdAttribute() != null) {
-        payloadBuilder.setIdAttribute(transform.getTransform().getIdAttribute());
-      }
-      payloadBuilder.setWithAttributes(transform.getTransform().isWithAttributes());
-      return FunctionSpec.newBuilder()
-          .setUrn(getUrn(transform.getTransform()))
-          .setPayload(payloadBuilder.build().toByteString())
-          .build();
-    }
-  }
-
-  @AutoService(TransformPayloadTranslatorRegistrar.class)
-  public static class Registrar implements TransformPayloadTranslatorRegistrar {
-
-    @Override
-    public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
-        getTransformPayloadTranslators() {
-      return Collections.singletonMap(
-          RunnerImplementedSource.class, new RunnerImplementedSourceTranslator());
-    }
-  }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSourceTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubReadPayloadTranslationTest.java
similarity index 53%
rename from sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSourceTranslationTest.java
rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubReadPayloadTranslationTest.java
index d1cea3a..9e43b59 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSourceTranslationTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubReadPayloadTranslationTest.java
@@ -25,9 +25,9 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.PubSubReadPayload;
 import org.apache.beam.runners.core.construction.Environments;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.SdkComponents;
+import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
-import org.apache.beam.sdk.io.gcp.pubsub.RunnerImplementedSourceTranslation.RunnerImplementedSourceTranslator;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -43,15 +43,15 @@ import org.junit.runners.Parameterized.Parameters;
 
 /** Test RunnerImplementedSourceTranslator. */
 @RunWith(Parameterized.class)
-public class RunnerImplementedSourceTranslationTest {
+public class PubSubReadPayloadTranslationTest {
   private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
   private static final String ID_ATTRIBUTE = "id";
   private static final String PROJECT = "project";
   private static final TopicPath TOPIC = PubsubClient.topicPathFromName(PROJECT, "testTopic");
   private static final SubscriptionPath SUBSCRIPTION =
       PubsubClient.subscriptionPathFromName(PROJECT, "testSubscription");
-  private final RunnerImplementedSourceTranslator sourceTranslator =
-      new RunnerImplementedSourceTranslator();
+  private final PubSubPayloadTranslation.PubSubReadPayloadTranslator sourceTranslator =
+      new PubSubPayloadTranslation.PubSubReadPayloadTranslator();
 
   public static TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
   private static final ValueProvider<TopicPath> TOPIC_PROVIDER = pipeline.newProvider(TOPIC);
@@ -64,16 +64,17 @@ public class RunnerImplementedSourceTranslationTest {
         new Object[][] {
           {
             // Read payload only from TOPIC.
-            new RunnerImplementedSource(
-                new PubsubUnboundedSource(
-                    PubsubTestClient.createFactoryForCreateSubscription(),
-                    StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
-                    StaticValueProvider.of(TOPIC),
-                    null /* subscription */,
-                    null /* timestampLabel */,
-                    null /* idLabel */,
-                    false /* needsAttributes */,
-                    false /* needsMessageId*/)),
+            Read.from(
+                new PubsubUnboundedSource.PubsubSource(
+                    new PubsubUnboundedSource(
+                        PubsubTestClient.createFactoryForCreateSubscription(),
+                        StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
+                        StaticValueProvider.of(TOPIC),
+                        null /* subscription */,
+                        null /* timestampLabel */,
+                        null /* idLabel */,
+                        false /* needsAttributes */,
+                        false /* needsMessageId*/))),
             PubSubReadPayload.newBuilder()
                 .setTopic(TOPIC.getFullPath())
                 .setWithAttributes(false)
@@ -81,16 +82,17 @@ public class RunnerImplementedSourceTranslationTest {
           },
           {
             // Read with attributes and message id from TOPIC.
-            new RunnerImplementedSource(
-                new PubsubUnboundedSource(
-                    PubsubTestClient.createFactoryForCreateSubscription(),
-                    StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
-                    StaticValueProvider.of(TOPIC),
-                    null /* subscription */,
-                    TIMESTAMP_ATTRIBUTE /* timestampLabel */,
-                    ID_ATTRIBUTE /* idLabel */,
-                    true /* needsAttributes */,
-                    true /* needsMessageId */)),
+            Read.from(
+                new PubsubUnboundedSource.PubsubSource(
+                    new PubsubUnboundedSource(
+                        PubsubTestClient.createFactoryForCreateSubscription(),
+                        StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
+                        StaticValueProvider.of(TOPIC),
+                        null /* subscription */,
+                        TIMESTAMP_ATTRIBUTE /* timestampLabel */,
+                        ID_ATTRIBUTE /* idLabel */,
+                        true /* needsAttributes */,
+                        true /* needsMessageId */))),
             PubSubReadPayload.newBuilder()
                 .setTopic(TOPIC.getFullPath())
                 .setIdAttribute(ID_ATTRIBUTE)
@@ -100,16 +102,17 @@ public class RunnerImplementedSourceTranslationTest {
           },
           {
             // Read payload from runtime provided topic.
-            new RunnerImplementedSource(
-                new PubsubUnboundedSource(
-                    PubsubTestClient.createFactoryForCreateSubscription(),
-                    StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
-                    TOPIC_PROVIDER,
-                    null /* subscription */,
-                    null /* timestampLabel */,
-                    null /* idLabel */,
-                    false /* needsAttributes */,
-                    false /* needsMessageId */)),
+            Read.from(
+                new PubsubUnboundedSource.PubsubSource(
+                    new PubsubUnboundedSource(
+                        PubsubTestClient.createFactoryForCreateSubscription(),
+                        StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
+                        TOPIC_PROVIDER,
+                        null /* subscription */,
+                        null /* timestampLabel */,
+                        null /* idLabel */,
+                        false /* needsAttributes */,
+                        false /* needsMessageId */))),
             PubSubReadPayload.newBuilder()
                 .setTopicRuntimeOverridden(((NestedValueProvider) TOPIC_PROVIDER).propertyName())
                 .setWithAttributes(false)
@@ -117,16 +120,17 @@ public class RunnerImplementedSourceTranslationTest {
           },
           {
             // Read payload with attributes and message id from runtime provided topic.
-            new RunnerImplementedSource(
-                new PubsubUnboundedSource(
-                    PubsubTestClient.createFactoryForCreateSubscription(),
-                    StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
-                    TOPIC_PROVIDER,
-                    null /* subscription */,
-                    TIMESTAMP_ATTRIBUTE /* timestampLabel */,
-                    ID_ATTRIBUTE /* idLabel */,
-                    true /* needsAttributes */,
-                    true /* needsMessageId */)),
+            Read.from(
+                new PubsubUnboundedSource.PubsubSource(
+                    new PubsubUnboundedSource(
+                        PubsubTestClient.createFactoryForCreateSubscription(),
+                        StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
+                        TOPIC_PROVIDER,
+                        null /* subscription */,
+                        TIMESTAMP_ATTRIBUTE /* timestampLabel */,
+                        ID_ATTRIBUTE /* idLabel */,
+                        true /* needsAttributes */,
+                        true /* needsMessageId */))),
             PubSubReadPayload.newBuilder()
                 .setTopicRuntimeOverridden(((NestedValueProvider) TOPIC_PROVIDER).propertyName())
                 .setIdAttribute(ID_ATTRIBUTE)
@@ -136,16 +140,17 @@ public class RunnerImplementedSourceTranslationTest {
           },
           {
             // Read payload only from SUBSCRIPTION.
-            new RunnerImplementedSource(
-                new PubsubUnboundedSource(
-                    PubsubTestClient.createFactoryForCreateSubscription(),
-                    StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
-                    null /* topic */,
-                    StaticValueProvider.of(SUBSCRIPTION),
-                    null /* timestampLabel */,
-                    null /* idLabel */,
-                    false /* needsAttributes */,
-                    false /* needsMessageId */)),
+            Read.from(
+                new PubsubUnboundedSource.PubsubSource(
+                    new PubsubUnboundedSource(
+                        PubsubTestClient.createFactoryForCreateSubscription(),
+                        StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
+                        null /* topic */,
+                        StaticValueProvider.of(SUBSCRIPTION),
+                        null /* timestampLabel */,
+                        null /* idLabel */,
+                        false /* needsAttributes */,
+                        false /* needsMessageId */))),
             PubSubReadPayload.newBuilder()
                 .setSubscription(SUBSCRIPTION.getFullPath())
                 .setWithAttributes(false)
@@ -153,16 +158,17 @@ public class RunnerImplementedSourceTranslationTest {
           },
           {
             // Read payload with attributes and message id from SUBSCRIPTION.
-            new RunnerImplementedSource(
-                new PubsubUnboundedSource(
-                    PubsubTestClient.createFactoryForCreateSubscription(),
-                    StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
-                    null /* topic */,
-                    StaticValueProvider.of(SUBSCRIPTION),
-                    TIMESTAMP_ATTRIBUTE /* timestampLabel */,
-                    ID_ATTRIBUTE /* idLabel */,
-                    true /* needsAttributes */,
-                    true /* needsMessageId */)),
+            Read.from(
+                new PubsubUnboundedSource.PubsubSource(
+                    new PubsubUnboundedSource(
+                        PubsubTestClient.createFactoryForCreateSubscription(),
+                        StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
+                        null /* topic */,
+                        StaticValueProvider.of(SUBSCRIPTION),
+                        TIMESTAMP_ATTRIBUTE /* timestampLabel */,
+                        ID_ATTRIBUTE /* idLabel */,
+                        true /* needsAttributes */,
+                        true /* needsMessageId */))),
             PubSubReadPayload.newBuilder()
                 .setSubscription(SUBSCRIPTION.getFullPath())
                 .setIdAttribute(ID_ATTRIBUTE)
@@ -172,16 +178,17 @@ public class RunnerImplementedSourceTranslationTest {
           },
           {
             // Read payload only from runtime provided subscription.
-            new RunnerImplementedSource(
-                new PubsubUnboundedSource(
-                    PubsubTestClient.createFactoryForCreateSubscription(),
-                    StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
-                    null /* topic */,
-                    SUBSCRIPTION_PROVIDER,
-                    null /* timestampLabel */,
-                    null /* idLabel */,
-                    false /* needsAttributes */,
-                    false /* needsMessageId */)),
+            Read.from(
+                new PubsubUnboundedSource.PubsubSource(
+                    new PubsubUnboundedSource(
+                        PubsubTestClient.createFactoryForCreateSubscription(),
+                        StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
+                        null /* topic */,
+                        SUBSCRIPTION_PROVIDER,
+                        null /* timestampLabel */,
+                        null /* idLabel */,
+                        false /* needsAttributes */,
+                        false /* needsMessageId */))),
             PubSubReadPayload.newBuilder()
                 .setSubscriptionRuntimeOverridden(
                     ((NestedValueProvider) SUBSCRIPTION_PROVIDER).propertyName())
@@ -190,16 +197,17 @@ public class RunnerImplementedSourceTranslationTest {
           },
           {
             // Read payload with attributes and message id from runtime provided subscription.
-            new RunnerImplementedSource(
-                new PubsubUnboundedSource(
-                    PubsubTestClient.createFactoryForCreateSubscription(),
-                    StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
-                    null /* topic */,
-                    SUBSCRIPTION_PROVIDER,
-                    TIMESTAMP_ATTRIBUTE /* timestampLabel */,
-                    ID_ATTRIBUTE /* idLabel */,
-                    true /* needsAttributes */,
-                    true /* needsMessageId */)),
+            Read.from(
+                new PubsubUnboundedSource.PubsubSource(
+                    new PubsubUnboundedSource(
+                        PubsubTestClient.createFactoryForCreateSubscription(),
+                        StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
+                        null /* topic */,
+                        SUBSCRIPTION_PROVIDER,
+                        TIMESTAMP_ATTRIBUTE /* timestampLabel */,
+                        ID_ATTRIBUTE /* idLabel */,
+                        true /* needsAttributes */,
+                        true /* needsMessageId */))),
             PubSubReadPayload.newBuilder()
                 .setSubscriptionRuntimeOverridden(
                     ((NestedValueProvider) SUBSCRIPTION_PROVIDER).propertyName())
@@ -212,24 +220,25 @@ public class RunnerImplementedSourceTranslationTest {
   }
 
   @Parameter(0)
-  public RunnerImplementedSource runnerImplementedSource;
+  public Read.Unbounded<byte[]> readFromPubSub;
 
   @Parameter(1)
   public PubSubReadPayload pubsubReadPayload;
 
   @Test
   public void testTranslateSourceToFunctionSpec() throws Exception {
-    PCollection<byte[]> output = pipeline.apply(runnerImplementedSource);
-    AppliedPTransform<?, ?, RunnerImplementedSource> appliedPTransform =
+    PCollection<byte[]> output = pipeline.apply(readFromPubSub);
+    AppliedPTransform<?, ?, Read.Unbounded<byte[]>> appliedPTransform =
         AppliedPTransform.of(
-            "sink",
+            "ReadFromPubsub",
             PValues.expandInput(pipeline.begin()),
             PValues.expandOutput(output),
-            runnerImplementedSource,
+            readFromPubSub,
             pipeline);
     SdkComponents components = SdkComponents.create();
     components.registerEnvironment(Environments.createDockerEnvironment("java"));
-    RunnerApi.FunctionSpec spec = sourceTranslator.translate(appliedPTransform, components);
+    RunnerApi.FunctionSpec spec =
+        sourceTranslator.translate((AppliedPTransform) appliedPTransform, components);
     assertEquals(PTransformTranslation.PUBSUB_READ, spec.getUrn());
     PubSubReadPayload result = PubSubReadPayload.parseFrom(spec.getPayload());
     assertEquals(pubsubReadPayload, result);
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSinkTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java
similarity index 80%
rename from sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSinkTranslationTest.java
rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java
index 52ebd22..872e671 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSinkTranslationTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java
@@ -25,8 +25,9 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.PubSubWritePayload;
 import org.apache.beam.runners.core.construction.Environments;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.SdkComponents;
+import org.apache.beam.sdk.io.gcp.pubsub.PubSubPayloadTranslation.PubSubWritePayloadTranslator;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
-import org.apache.beam.sdk.io.gcp.pubsub.RunnerImplementedSinkTranslation.RunnerImplementedSinkTranslator;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink.PubsubSink;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -44,18 +45,18 @@ import org.junit.runners.JUnit4;
 
 /** Test RunnerImplementedSinkTranslator. */
 @RunWith(JUnit4.class)
-public class RunnerImplementedSinkTranslationTest {
+public class PubSubWritePayloadTranslationTest {
   private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
   private static final String ID_ATTRIBUTE = "id";
   private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
-  private final RunnerImplementedSinkTranslator sinkTranslator =
-      new RunnerImplementedSinkTranslator();
+  private final PubSubPayloadTranslation.PubSubWritePayloadTranslator sinkTranslator =
+      new PubSubWritePayloadTranslator();
 
   @Rule public TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
 
   @Test
   public void testTranslateSinkWithTopic() throws Exception {
-    PubsubUnboundedSink pubsubSink =
+    PubsubUnboundedSink pubsubUnboundedSink =
         new PubsubUnboundedSink(
             null,
             StaticValueProvider.of(TOPIC),
@@ -66,16 +67,12 @@ public class RunnerImplementedSinkTranslationTest {
             0,
             Duration.ZERO,
             null);
-    RunnerImplementedSink runnerImplementedSink = new RunnerImplementedSink(pubsubSink);
+    PubsubUnboundedSink.PubsubSink pubsubSink = new PubsubSink(pubsubUnboundedSink);
     PCollection<byte[]> input = pipeline.apply(Create.of(new byte[0]));
-    PDone output = input.apply(runnerImplementedSink);
-    AppliedPTransform<?, ?, RunnerImplementedSink> appliedPTransform =
+    PDone output = input.apply(pubsubSink);
+    AppliedPTransform<?, ?, PubsubSink> appliedPTransform =
         AppliedPTransform.of(
-            "sink",
-            PValues.expandInput(input),
-            PValues.expandOutput(output),
-            runnerImplementedSink,
-            pipeline);
+            "sink", PValues.expandInput(input), PValues.expandOutput(output), pubsubSink, pipeline);
     SdkComponents components = SdkComponents.create();
     components.registerEnvironment(Environments.createDockerEnvironment("java"));
     RunnerApi.FunctionSpec spec = sinkTranslator.translate(appliedPTransform, components);
@@ -91,19 +88,15 @@ public class RunnerImplementedSinkTranslationTest {
   @Test
   public void testTranslateSinkWithTopicOverridden() throws Exception {
     ValueProvider<TopicPath> runtimeProvider = pipeline.newProvider(TOPIC);
-    PubsubUnboundedSink pubsubSink =
+    PubsubUnboundedSink pubsubUnboundedSinkSink =
         new PubsubUnboundedSink(
             null, runtimeProvider, TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, 0, 0, 0, Duration.ZERO, null);
-    RunnerImplementedSink runnerImplementedSink = new RunnerImplementedSink(pubsubSink);
+    PubsubSink pubsubSink = new PubsubSink(pubsubUnboundedSinkSink);
     PCollection<byte[]> input = pipeline.apply(Create.of(new byte[0]));
-    PDone output = input.apply(runnerImplementedSink);
-    AppliedPTransform<?, ?, RunnerImplementedSink> appliedPTransform =
+    PDone output = input.apply(pubsubSink);
+    AppliedPTransform<?, ?, PubsubSink> appliedPTransform =
         AppliedPTransform.of(
-            "sink",
-            PValues.expandInput(input),
-            PValues.expandOutput(output),
-            runnerImplementedSink,
-            pipeline);
+            "sink", PValues.expandInput(input), PValues.expandOutput(output), pubsubSink, pipeline);
     SdkComponents components = SdkComponents.create();
     components.registerEnvironment(Environments.createDockerEnvironment("java"));
     RunnerApi.FunctionSpec spec = sinkTranslator.translate(appliedPTransform, components);
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
index e9c2a15..7e7e2e2 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
@@ -159,13 +159,17 @@ public class PubsubIOExternalTest {
     RunnerApi.PTransform writeComposite =
         result.getComponents().getTransformsOrThrow(transform.getSubtransforms(1));
 
-    // test_namespacetest/PubsubUnboundedSink/PubsubUnboundedSink.Writer
+    // test_namespacetest/PubsubUnboundedSink/PubsubSink
     RunnerApi.PTransform writeComposite2 =
-        result.getComponents().getTransformsOrThrow(writeComposite.getSubtransforms(3));
+        result.getComponents().getTransformsOrThrow(writeComposite.getSubtransforms(1));
 
-    // test_namespacetest/PubsubUnboundedSink/PubsubUnboundedSink.Writer/ParMultiDo(Writer)
+    // test_namespacetest/PubsubUnboundedSink/PubsubSink/PubsubUnboundedSink.Writer
+    RunnerApi.PTransform writeComposite3 =
+        result.getComponents().getTransformsOrThrow(writeComposite2.getSubtransforms(3));
+
+    // test_namespacetest/PubsubUnboundedSink/PubsubSink/PubsubUnboundedSink.Writer/ParMultiDo(Writer)
     RunnerApi.PTransform writeParDo =
-        result.getComponents().getTransformsOrThrow(writeComposite2.getSubtransforms(0));
+        result.getComponents().getTransformsOrThrow(writeComposite3.getSubtransforms(0));
 
     RunnerApi.ParDoPayload parDoPayload =
         RunnerApi.ParDoPayload.parseFrom(writeParDo.getSpec().getPayload());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
index 8c25c40..a9564d3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
@@ -32,6 +32,7 @@ import static org.junit.Assert.assertTrue;
 
 import com.google.api.client.util.Clock;
 import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -120,8 +121,12 @@ public class PubsubUnboundedSourceTest {
     factory = null;
   }
 
-  private static String data(PubsubMessage message) {
-    return new String(message.getPayload(), StandardCharsets.UTF_8);
+  private static String data(byte[] message, boolean payloadOnly) throws Exception {
+    if (payloadOnly) {
+      return new String(message, StandardCharsets.UTF_8);
+    }
+    PubsubMessage data = PubsubMessage.parseFrom(message);
+    return new String(data.getData().toByteArray(), StandardCharsets.UTF_8);
   }
 
   @Test
@@ -134,12 +139,16 @@ public class PubsubUnboundedSourceTest {
   }
 
   @Test
-  public void readOneMessage() throws IOException {
+  public void readOneMessage() throws Exception {
     setupOneMessage();
     PubsubReader reader = primSource.createReader(p.getOptions(), null);
     // Read one message.
     assertTrue(reader.start());
-    assertEquals(DATA, data(reader.getCurrent()));
+    assertEquals(
+        DATA,
+        data(
+            reader.getCurrent(),
+            !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId())));
     assertFalse(reader.advance());
     // ACK the message.
     PubsubCheckpoint checkpoint = reader.getCheckpointMark();
@@ -148,18 +157,26 @@ public class PubsubUnboundedSourceTest {
   }
 
   @Test
-  public void timeoutAckAndRereadOneMessage() throws IOException {
+  public void timeoutAckAndRereadOneMessage() throws Exception {
     setupOneMessage();
     PubsubReader reader = primSource.createReader(p.getOptions(), null);
     PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
     assertTrue(reader.start());
-    assertEquals(DATA, data(reader.getCurrent()));
+    assertEquals(
+        DATA,
+        data(
+            reader.getCurrent(),
+            !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId())));
     // Let the ACK deadline for the above expire.
     now.addAndGet(65 * 1000);
     pubsubClient.advance();
     // We'll now receive the same message again.
     assertTrue(reader.advance());
-    assertEquals(DATA, data(reader.getCurrent()));
+    assertEquals(
+        DATA,
+        data(
+            reader.getCurrent(),
+            !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId())));
     assertFalse(reader.advance());
     // Now ACK the message.
     PubsubCheckpoint checkpoint = reader.getCheckpointMark();
@@ -168,13 +185,17 @@ public class PubsubUnboundedSourceTest {
   }
 
   @Test
-  public void extendAck() throws IOException {
+  public void extendAck() throws Exception {
     setupOneMessage();
     PubsubReader reader = primSource.createReader(p.getOptions(), null);
     PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
     // Pull the first message but don't take a checkpoint for it.
     assertTrue(reader.start());
-    assertEquals(DATA, data(reader.getCurrent()));
+    assertEquals(
+        DATA,
+        data(
+            reader.getCurrent(),
+            !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId())));
     // Extend the ack
     now.addAndGet(55 * 1000);
     pubsubClient.advance();
@@ -190,13 +211,17 @@ public class PubsubUnboundedSourceTest {
   }
 
   @Test
-  public void timeoutAckExtensions() throws IOException {
+  public void timeoutAckExtensions() throws Exception {
     setupOneMessage();
     PubsubReader reader = primSource.createReader(p.getOptions(), null);
     PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
     // Pull the first message but don't take a checkpoint for it.
     assertTrue(reader.start());
-    assertEquals(DATA, data(reader.getCurrent()));
+    assertEquals(
+        DATA,
+        data(
+            reader.getCurrent(),
+            !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId())));
     // Extend the ack.
     now.addAndGet(55 * 1000);
     pubsubClient.advance();
@@ -212,7 +237,11 @@ public class PubsubUnboundedSourceTest {
     pubsubClient.advance();
     // Reread the same message.
     assertTrue(reader.advance());
-    assertEquals(DATA, data(reader.getCurrent()));
+    assertEquals(
+        DATA,
+        data(
+            reader.getCurrent(),
+            !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId())));
     // Now ACK the message.
     PubsubCheckpoint checkpoint = reader.getCheckpointMark();
     checkpoint.finalizeCheckpoint();
@@ -220,7 +249,7 @@ public class PubsubUnboundedSourceTest {
   }
 
   @Test
-  public void multipleReaders() throws IOException {
+  public void multipleReaders() throws Exception {
     List<IncomingMessage> incoming = new ArrayList<>();
     for (int i = 0; i < 2; i++) {
       String data = String.format("data_%d", i);
@@ -239,7 +268,11 @@ public class PubsubUnboundedSourceTest {
     PubsubReader reader = primSource.createReader(p.getOptions(), null);
     // Consume two messages, only read one.
     assertTrue(reader.start());
-    assertEquals("data_0", data(reader.getCurrent()));
+    assertEquals(
+        "data_0",
+        data(
+            reader.getCurrent(),
+            !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId())));
 
     // Grab checkpoint.
     PubsubCheckpoint checkpoint = reader.getCheckpointMark();
@@ -249,7 +282,11 @@ public class PubsubUnboundedSourceTest {
 
     // Read second message.
     assertTrue(reader.advance());
-    assertEquals("data_1", data(reader.getCurrent()));
+    assertEquals(
+        "data_1",
+        data(
+            reader.getCurrent(),
+            !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId())));
 
     // Restore from checkpoint.
     byte[] checkpointBytes =
@@ -262,7 +299,11 @@ public class PubsubUnboundedSourceTest {
     // Re-read second message.
     reader = primSource.createReader(p.getOptions(), checkpoint);
     assertTrue(reader.start());
-    assertEquals("data_1", data(reader.getCurrent()));
+    assertEquals(
+        "data_1",
+        data(
+            reader.getCurrent(),
+            !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId())));
 
     // We are done.
     assertFalse(reader.advance());
@@ -278,7 +319,7 @@ public class PubsubUnboundedSourceTest {
   }
 
   @Test
-  public void readManyMessages() throws IOException {
+  public void readManyMessages() throws Exception {
     Map<String, Integer> dataToMessageNum = new HashMap<>();
 
     final int m = 97;
@@ -315,7 +356,10 @@ public class PubsubUnboundedSourceTest {
       // We'll checkpoint and ack within the 2min limit.
       now.addAndGet(30);
       pubsubClient.advance();
-      String data = data(reader.getCurrent());
+      String data =
+          data(
+              reader.getCurrent(),
+              !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId()));
       Integer messageNum = dataToMessageNum.remove(data);
       // No duplicate messages.
       assertNotNull(messageNum);