You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/01/10 19:22:07 UTC

[beam] branch revert-10474-master created (now 1b35c61)

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a change to branch revert-10474-master
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at 1b35c61  Revert "[BEAM-8932] [BEAM-9036] Revert reverted commit to use PubsubMessage as the canonical type in beam client (#10474)"

This branch includes the following new commits:

     new 1b35c61  Revert "[BEAM-8932] [BEAM-9036] Revert reverted commit to use PubsubMessage as the canonical type in beam client (#10474)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: Revert "[BEAM-8932] [BEAM-9036] Revert reverted commit to use PubsubMessage as the canonical type in beam client (#10474)"

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch revert-10474-master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 1b35c617428f76fb9b3723d00829be89e8f70abe
Author: Boyuan Zhang <36...@users.noreply.github.com>
AuthorDate: Fri Jan 10 11:21:58 2020 -0800

    Revert "[BEAM-8932] [BEAM-9036] Revert reverted commit to use PubsubMessage as the canonical type in beam client (#10474)"
    
    This reverts commit 784d18b7ac89f87dd7fbf2861ee877f5b6070276.
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   4 +-
 .../beam/sdk/io/gcp/pubsub/PubsubClient.java       | 139 ++++++++++++++++-----
 .../beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java   |  29 +++--
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java    |   9 +-
 .../beam/sdk/io/gcp/pubsub/PubsubJsonClient.java   |  38 ++----
 .../beam/sdk/io/gcp/pubsub/PubsubTestClient.java   |  11 +-
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java     |  23 ++--
 .../sdk/io/gcp/pubsub/PubsubUnboundedSource.java   |  33 +++--
 .../apache/beam/sdk/io/gcp/pubsub/TestPubsub.java  |  20 +--
 .../beam/sdk/io/gcp/pubsub/TestPubsubSignal.java   |   6 +-
 .../apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java  |   1 -
 .../sdk/io/gcp/pubsub/PubsubGrpcClientTest.java    |  19 ++-
 .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java       |   8 +-
 .../sdk/io/gcp/pubsub/PubsubJsonClientTest.java    |  76 ++---------
 .../sdk/io/gcp/pubsub/PubsubTestClientTest.java    |  31 ++---
 .../sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java |  39 ++----
 .../io/gcp/pubsub/PubsubUnboundedSourceTest.java   |  28 ++---
 17 files changed, 227 insertions(+), 287 deletions(-)

diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 7e1cf0f..c2d1464 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -365,7 +365,7 @@ class BeamModulePlugin implements Plugin<Project> {
     def cassandra_driver_version = "3.8.0"
     def classgraph_version = "4.8.56"
     def generated_grpc_beta_version = "0.44.0"
-    def generated_grpc_ga_version = "1.83.0"
+    def generated_grpc_ga_version = "1.43.0"
     def generated_grpc_dc_beta_version = "0.27.0-alpha"
     def google_auth_version = "0.19.0"
     def google_clients_version = "1.28.0"
@@ -444,7 +444,7 @@ class BeamModulePlugin implements Plugin<Project> {
         google_api_services_clouddebugger           : "com.google.apis:google-api-services-clouddebugger:v2-rev20181114-$google_clients_version",
         google_api_services_cloudresourcemanager    : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20181015-$google_clients_version",
         google_api_services_dataflow                : "com.google.apis:google-api-services-dataflow:v1b3-rev20190927-$google_clients_version",
-        google_api_services_pubsub                  : "com.google.apis:google-api-services-pubsub:v1-rev20191111-$google_clients_version",
+        google_api_services_pubsub                  : "com.google.apis:google-api-services-pubsub:v1-rev20181213-$google_clients_version",
         google_api_services_storage                 : "com.google.apis:google-api-services-storage:v1-rev20181109-$google_clients_version",
         google_auth_library_credentials             : "com.google.auth:google-auth-library-credentials:$google_auth_version",
         google_auth_library_oauth2_http             : "com.google.auth:google-auth-library-oauth2-http:$google_auth_version",
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
index 6f0f54d..07d6da6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
@@ -21,12 +21,10 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
 import com.google.api.client.util.DateTime;
-import com.google.auto.value.AutoValue;
-import com.google.protobuf.ByteString;
-import com.google.pubsub.v1.PubsubMessage;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
@@ -300,37 +298,59 @@ public abstract class PubsubClient implements Closeable {
    * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. Java
    * serialization is never used for non-test clients.
    */
-  @AutoValue
-  public abstract static class OutgoingMessage implements Serializable {
+  public static class OutgoingMessage implements Serializable {
+    /** Underlying (encoded) element. */
+    public final byte[] elementBytes;
 
-    /** Underlying Message. May not have publish timestamp set. */
-    public abstract PubsubMessage message();
+    public final Map<String, String> attributes;
 
     /** Timestamp for element (ms since epoch). */
-    public abstract long timestampMsSinceEpoch();
+    public final long timestampMsSinceEpoch;
 
     /**
      * If using an id attribute, the record id to associate with this record's metadata so the
      * receiver can reject duplicates. Otherwise {@literal null}.
      */
-    @Nullable
-    public abstract String recordId();
+    @Nullable public final String recordId;
 
-    public static OutgoingMessage of(
-        PubsubMessage message, long timestampMsSinceEpoch, @Nullable String recordId) {
-      return new AutoValue_PubsubClient_OutgoingMessage(message, timestampMsSinceEpoch, recordId);
-    }
-
-    public static OutgoingMessage of(
-        org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage message,
+    public OutgoingMessage(
+        byte[] elementBytes,
+        Map<String, String> attributes,
         long timestampMsSinceEpoch,
         @Nullable String recordId) {
-      PubsubMessage.Builder builder =
-          PubsubMessage.newBuilder().setData(ByteString.copyFrom(message.getPayload()));
-      if (message.getAttributeMap() != null) {
-        builder.putAllAttributes(message.getAttributeMap());
+      this.elementBytes = elementBytes;
+      this.attributes = attributes;
+      this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+      this.recordId = recordId;
+    }
+
+    @Override
+    public String toString() {
+      return String.format(
+          "OutgoingMessage(%db, %dms)", elementBytes.length, timestampMsSinceEpoch);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
       }
-      return of(builder.build(), timestampMsSinceEpoch, recordId);
+
+      OutgoingMessage that = (OutgoingMessage) o;
+
+      return timestampMsSinceEpoch == that.timestampMsSinceEpoch
+          && Arrays.equals(elementBytes, that.elementBytes)
+          && Objects.equal(attributes, that.attributes)
+          && Objects.equal(recordId, that.recordId);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(
+          Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch, recordId);
     }
   }
 
@@ -340,35 +360,86 @@ public abstract class PubsubClient implements Closeable {
    * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. Java
    * serialization is never used for non-test clients.
    */
-  @AutoValue
-  abstract static class IncomingMessage implements Serializable {
+  static class IncomingMessage implements Serializable {
+    /** Underlying (encoded) element. */
+    public final byte[] elementBytes;
 
-    /** Underlying Message. */
-    public abstract PubsubMessage message();
+    public Map<String, String> attributes;
 
     /**
      * Timestamp for element (ms since epoch). Either Pubsub's processing time, or the custom
      * timestamp associated with the message.
      */
-    public abstract long timestampMsSinceEpoch();
+    public final long timestampMsSinceEpoch;
 
     /** Timestamp (in system time) at which we requested the message (ms since epoch). */
-    public abstract long requestTimeMsSinceEpoch();
+    public final long requestTimeMsSinceEpoch;
 
     /** Id to pass back to Pubsub to acknowledge receipt of this message. */
-    public abstract String ackId();
+    public final String ackId;
 
     /** Id to pass to the runner to distinguish this message from all others. */
-    public abstract String recordId();
+    public final String recordId;
 
-    public static IncomingMessage of(
-        PubsubMessage message,
+    public IncomingMessage(
+        byte[] elementBytes,
+        Map<String, String> attributes,
         long timestampMsSinceEpoch,
         long requestTimeMsSinceEpoch,
         String ackId,
         String recordId) {
-      return new AutoValue_PubsubClient_IncomingMessage(
-          message, timestampMsSinceEpoch, requestTimeMsSinceEpoch, ackId, recordId);
+      this.elementBytes = elementBytes;
+      this.attributes = attributes;
+      this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+      this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
+      this.ackId = ackId;
+      this.recordId = recordId;
+    }
+
+    public IncomingMessage withRequestTime(long requestTimeMsSinceEpoch) {
+      return new IncomingMessage(
+          elementBytes,
+          attributes,
+          timestampMsSinceEpoch,
+          requestTimeMsSinceEpoch,
+          ackId,
+          recordId);
+    }
+
+    @Override
+    public String toString() {
+      return String.format(
+          "IncomingMessage(%db, %dms)", elementBytes.length, timestampMsSinceEpoch);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      IncomingMessage that = (IncomingMessage) o;
+
+      return timestampMsSinceEpoch == that.timestampMsSinceEpoch
+          && requestTimeMsSinceEpoch == that.requestTimeMsSinceEpoch
+          && ackId.equals(that.ackId)
+          && recordId.equals(that.recordId)
+          && Arrays.equals(elementBytes, that.elementBytes)
+          && Objects.equal(attributes, that.attributes);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(
+          Arrays.hashCode(elementBytes),
+          attributes,
+          timestampMsSinceEpoch,
+          requestTimeMsSinceEpoch,
+          ackId,
+          recordId);
     }
   }
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
index a3b6b8d..ae3fa02 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.pubsub;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
 import com.google.auth.Credentials;
+import com.google.protobuf.ByteString;
 import com.google.protobuf.Timestamp;
 import com.google.pubsub.v1.AcknowledgeRequest;
 import com.google.pubsub.v1.DeleteSubscriptionRequest;
@@ -212,15 +213,21 @@ public class PubsubGrpcClient extends PubsubClient {
   public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException {
     PublishRequest.Builder request = PublishRequest.newBuilder().setTopic(topic.getPath());
     for (OutgoingMessage outgoingMessage : outgoingMessages) {
-      PubsubMessage.Builder message = outgoingMessage.message().toBuilder();
+      PubsubMessage.Builder message =
+          PubsubMessage.newBuilder().setData(ByteString.copyFrom(outgoingMessage.elementBytes));
+
+      if (outgoingMessage.attributes != null) {
+        message.putAllAttributes(outgoingMessage.attributes);
+      }
 
       if (timestampAttribute != null) {
-        message.putAttributes(
-            timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch()));
+        message
+            .getMutableAttributes()
+            .put(timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
       }
 
-      if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId())) {
-        message.putAttributes(idAttribute, outgoingMessage.recordId());
+      if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+        message.getMutableAttributes().put(idAttribute, outgoingMessage.recordId);
       }
 
       request.addMessages(message);
@@ -252,6 +259,9 @@ public class PubsubGrpcClient extends PubsubClient {
       PubsubMessage pubsubMessage = message.getMessage();
       @Nullable Map<String, String> attributes = pubsubMessage.getAttributes();
 
+      // Payload.
+      byte[] elementBytes = pubsubMessage.getData().toByteArray();
+
       // Timestamp.
       String pubsubTimestampString = null;
       Timestamp timestampProto = pubsubMessage.getPublishTime();
@@ -277,8 +287,13 @@ public class PubsubGrpcClient extends PubsubClient {
       }
 
       incomingMessages.add(
-          IncomingMessage.of(
-              pubsubMessage, timestampMsSinceEpoch, requestTimeMsSinceEpoch, ackId, recordId));
+          new IncomingMessage(
+              elementBytes,
+              attributes,
+              timestampMsSinceEpoch,
+              requestTimeMsSinceEpoch,
+              ackId,
+              recordId));
     }
     return incomingMessages;
   }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 5f6d044..da5266f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -1303,14 +1303,7 @@ public class PubsubIO {
         }
 
         // NOTE: The record id is always null.
-        output.add(
-            OutgoingMessage.of(
-                com.google.pubsub.v1.PubsubMessage.newBuilder()
-                    .setData(ByteString.copyFrom(payload))
-                    .putAllAttributes(attributes)
-                    .build(),
-                c.timestamp().getMillis(),
-                null));
+        output.add(new OutgoingMessage(payload, attributes, c.timestamp().getMillis(), null));
         currentOutputBytes += payload.length;
       }
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
index 1ae5a55..136b1d2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
@@ -39,10 +39,8 @@ import com.google.api.services.pubsub.model.Topic;
 import com.google.auth.Credentials;
 import com.google.auth.http.HttpCredentialsAdapter;
 import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
-import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -125,12 +123,8 @@ public class PubsubJsonClient extends PubsubClient {
   public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException {
     List<PubsubMessage> pubsubMessages = new ArrayList<>(outgoingMessages.size());
     for (OutgoingMessage outgoingMessage : outgoingMessages) {
-      PubsubMessage pubsubMessage =
-          new PubsubMessage().encodeData(outgoingMessage.message().getData().toByteArray());
+      PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes);
       pubsubMessage.setAttributes(getMessageAttributes(outgoingMessage));
-      if (!outgoingMessage.message().getOrderingKey().isEmpty()) {
-        pubsubMessage.put("orderingKey", outgoingMessage.message().getOrderingKey());
-      }
       pubsubMessages.add(pubsubMessage);
     }
     PublishRequest request = new PublishRequest().setMessages(pubsubMessages);
@@ -141,16 +135,16 @@ public class PubsubJsonClient extends PubsubClient {
 
   private Map<String, String> getMessageAttributes(OutgoingMessage outgoingMessage) {
     Map<String, String> attributes = null;
-    if (outgoingMessage.message().getAttributesMap() == null) {
+    if (outgoingMessage.attributes == null) {
       attributes = new TreeMap<>();
     } else {
-      attributes = new TreeMap<>(outgoingMessage.message().getAttributesMap());
+      attributes = new TreeMap<>(outgoingMessage.attributes);
     }
     if (timestampAttribute != null) {
-      attributes.put(timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch()));
+      attributes.put(timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
     }
-    if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId())) {
-      attributes.put(idAttribute, outgoingMessage.recordId());
+    if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+      attributes.put(idAttribute, outgoingMessage.recordId);
     }
     return attributes;
   }
@@ -172,12 +166,7 @@ public class PubsubJsonClient extends PubsubClient {
     List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessages().size());
     for (ReceivedMessage message : response.getReceivedMessages()) {
       PubsubMessage pubsubMessage = message.getMessage();
-      Map<String, String> attributes;
-      if (pubsubMessage.getAttributes() != null) {
-        attributes = pubsubMessage.getAttributes();
-      } else {
-        attributes = new HashMap<>();
-      }
+      @Nullable Map<String, String> attributes = pubsubMessage.getAttributes();
 
       // Payload.
       byte[] elementBytes = pubsubMessage.getData() == null ? null : pubsubMessage.decodeData();
@@ -195,7 +184,7 @@ public class PubsubJsonClient extends PubsubClient {
 
       // Record id, if any.
       @Nullable String recordId = null;
-      if (idAttribute != null) {
+      if (idAttribute != null && attributes != null) {
         recordId = attributes.get(idAttribute);
       }
       if (Strings.isNullOrEmpty(recordId)) {
@@ -203,15 +192,10 @@ public class PubsubJsonClient extends PubsubClient {
         recordId = pubsubMessage.getMessageId();
       }
 
-      com.google.pubsub.v1.PubsubMessage.Builder protoMessage =
-          com.google.pubsub.v1.PubsubMessage.newBuilder();
-      protoMessage.setData(ByteString.copyFrom(elementBytes));
-      protoMessage.putAllAttributes(attributes);
-      protoMessage.setOrderingKey(
-          (String) pubsubMessage.getUnknownKeys().getOrDefault("orderingKey", ""));
       incomingMessages.add(
-          IncomingMessage.of(
-              protoMessage.build(),
+          new IncomingMessage(
+              elementBytes,
+              attributes,
               timestampMsSinceEpoch,
               requestTimeMsSinceEpoch,
               ackId,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
index c3b915d..6b20b56 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
@@ -309,17 +309,12 @@ public class PubsubTestClient extends PubsubClient implements Serializable {
         IncomingMessage incomingMessage = pendItr.next();
         pendItr.remove();
         IncomingMessage incomingMessageWithRequestTime =
-            IncomingMessage.of(
-                incomingMessage.message(),
-                incomingMessage.timestampMsSinceEpoch(),
-                requestTimeMsSinceEpoch,
-                incomingMessage.ackId(),
-                incomingMessage.recordId());
+            incomingMessage.withRequestTime(requestTimeMsSinceEpoch);
         incomingMessages.add(incomingMessageWithRequestTime);
         STATE.pendingAckIncomingMessages.put(
-            incomingMessageWithRequestTime.ackId(), incomingMessageWithRequestTime);
+            incomingMessageWithRequestTime.ackId, incomingMessageWithRequestTime);
         STATE.ackDeadline.put(
-            incomingMessageWithRequestTime.ackId(),
+            incomingMessageWithRequestTime.ackId,
             requestTimeMsSinceEpoch + STATE.ackTimeoutSec * 1000);
         if (incomingMessages.size() >= batchSize) {
           break;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index 8be8c56..1258d0b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -37,7 +38,6 @@ import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
@@ -101,18 +101,19 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
     @Override
     public void encode(OutgoingMessage value, OutputStream outStream)
         throws CoderException, IOException {
-      ProtoCoder.of(com.google.pubsub.v1.PubsubMessage.class).encode(value.message(), outStream);
-      BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch(), outStream);
-      RECORD_ID_CODER.encode(value.recordId(), outStream);
+      ByteArrayCoder.of().encode(value.elementBytes, outStream);
+      ATTRIBUTES_CODER.encode(value.attributes, outStream);
+      BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream);
+      RECORD_ID_CODER.encode(value.recordId, outStream);
     }
 
     @Override
     public OutgoingMessage decode(InputStream inStream) throws CoderException, IOException {
-      com.google.pubsub.v1.PubsubMessage message =
-          ProtoCoder.of(com.google.pubsub.v1.PubsubMessage.class).decode(inStream);
+      byte[] elementBytes = ByteArrayCoder.of().decode(inStream);
+      Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream);
       long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream);
       @Nullable String recordId = RECORD_ID_CODER.decode(inStream);
-      return OutgoingMessage.of(message, timestampMsSinceEpoch, recordId);
+      return new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch, recordId);
     }
   }
 
@@ -153,6 +154,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
       elementCounter.inc();
       PubsubMessage message = c.element();
       byte[] elementBytes = message.getPayload();
+      Map<String, String> attributes = message.getAttributeMap();
 
       long timestampMsSinceEpoch = c.timestamp().getMillis();
       @Nullable String recordId = null;
@@ -173,7 +175,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
       c.output(
           KV.of(
               ThreadLocalRandom.current().nextInt(numShards),
-              OutgoingMessage.of(message, timestampMsSinceEpoch, recordId)));
+              new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch, recordId)));
     }
 
     @Override
@@ -244,8 +246,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
       List<OutgoingMessage> pubsubMessages = new ArrayList<>(publishBatchSize);
       int bytes = 0;
       for (OutgoingMessage message : c.element().getValue()) {
-        if (!pubsubMessages.isEmpty()
-            && bytes + message.message().getData().size() > publishBatchBytes) {
+        if (!pubsubMessages.isEmpty() && bytes + message.elementBytes.length > publishBatchBytes) {
           // Break large (in bytes) batches into smaller.
           // (We've already broken by batch size using the trigger below, though that may
           // run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider that ok since
@@ -256,7 +257,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
           bytes = 0;
         }
         pubsubMessages.add(message);
-        bytes += message.message().getData().size();
+        bytes += message.elementBytes.length;
       }
       if (!pubsubMessages.isEmpty()) {
         // BLOCKS until published.
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index 230161c..d8abfe1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -727,18 +727,18 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
       // Capture the received messages.
       for (PubsubClient.IncomingMessage incomingMessage : receivedMessages) {
         notYetRead.add(incomingMessage);
-        notYetReadBytes += incomingMessage.message().getData().size();
+        notYetReadBytes += incomingMessage.elementBytes.length;
         inFlight.put(
-            incomingMessage.ackId(),
+            incomingMessage.ackId,
             new InFlightState(requestTimeMsSinceEpoch, deadlineMsSinceEpoch));
         numReceived++;
         numReceivedRecently.add(requestTimeMsSinceEpoch, 1L);
         minReceivedTimestampMsSinceEpoch.add(
-            requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch());
+            requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch);
         maxReceivedTimestampMsSinceEpoch.add(
-            requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch());
+            requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch);
         minUnreadTimestampMsSinceEpoch.add(
-            requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch());
+            requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch);
       }
     }
 
@@ -837,7 +837,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
 
       if (current != null) {
         // Current is consumed. It can no longer contribute to holding back the watermark.
-        minUnreadTimestampMsSinceEpoch.remove(current.requestTimeMsSinceEpoch());
+        minUnreadTimestampMsSinceEpoch.remove(current.requestTimeMsSinceEpoch);
         current = null;
       }
 
@@ -864,18 +864,18 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
         // Try again later.
         return false;
       }
-      notYetReadBytes -= current.message().getData().size();
+      notYetReadBytes -= current.elementBytes.length;
       checkState(notYetReadBytes >= 0);
       long nowMsSinceEpoch = now();
-      numReadBytes.add(nowMsSinceEpoch, current.message().getData().size());
-      minReadTimestampMsSinceEpoch.add(nowMsSinceEpoch, current.timestampMsSinceEpoch());
-      if (current.timestampMsSinceEpoch() < lastWatermarkMsSinceEpoch) {
+      numReadBytes.add(nowMsSinceEpoch, current.elementBytes.length);
+      minReadTimestampMsSinceEpoch.add(nowMsSinceEpoch, current.timestampMsSinceEpoch);
+      if (current.timestampMsSinceEpoch < lastWatermarkMsSinceEpoch) {
         numLateMessages.add(nowMsSinceEpoch, 1L);
       }
 
       // Current message can be considered 'read' and will be persisted by the next
       // checkpoint. So it is now safe to ACK back to Pubsub.
-      safeToAckIds.add(current.ackId());
+      safeToAckIds.add(current.ackId);
       return true;
     }
 
@@ -884,10 +884,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
       if (current == null) {
         throw new NoSuchElementException();
       }
-      return new PubsubMessage(
-          current.message().getData().toByteArray(),
-          current.message().getAttributesMap(),
-          current.recordId());
+      return new PubsubMessage(current.elementBytes, current.attributes, current.recordId);
     }
 
     @Override
@@ -895,7 +892,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
       if (current == null) {
         throw new NoSuchElementException();
       }
-      return new Instant(current.timestampMsSinceEpoch());
+      return new Instant(current.timestampMsSinceEpoch);
     }
 
     @Override
@@ -903,7 +900,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
       if (current == null) {
         throw new NoSuchElementException();
       }
-      return current.recordId().getBytes(StandardCharsets.UTF_8);
+      return current.recordId.getBytes(StandardCharsets.UTF_8);
     }
 
     /**
@@ -987,7 +984,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
       List<String> snapshotSafeToAckIds = Lists.newArrayList(safeToAckIds);
       List<String> snapshotNotYetReadIds = new ArrayList<>(notYetRead.size());
       for (PubsubClient.IncomingMessage incomingMessage : notYetRead) {
-        snapshotNotYetReadIds.add(incomingMessage.ackId());
+        snapshotNotYetReadIds.add(incomingMessage.ackId);
       }
       if (outer.subscriptionPath == null) {
         // need to include the subscription in case we resume, as it's not stored in the source.
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
index e1e8711..1e75d43 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
@@ -22,14 +22,12 @@ import static org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.projectPathFromPath
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 
-import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
@@ -207,16 +205,11 @@ public class TestPubsub implements TestRule {
     if (!messages.isEmpty()) {
       pubsub.acknowledge(
           subscriptionPath,
-          messages.stream().map(IncomingMessage::ackId).collect(ImmutableList.toImmutableList()));
+          messages.stream().map(msg -> msg.ackId).collect(ImmutableList.toImmutableList()));
     }
 
     return messages.stream()
-        .map(
-            msg ->
-                new PubsubMessage(
-                    msg.message().getData().toByteArray(),
-                    msg.message().getAttributesMap(),
-                    msg.recordId()))
+        .map(msg -> new PubsubMessage(msg.elementBytes, msg.attributes, msg.recordId))
         .collect(ImmutableList.toImmutableList());
   }
 
@@ -299,12 +292,7 @@ public class TestPubsub implements TestRule {
   }
 
   private PubsubClient.OutgoingMessage toOutgoingMessage(PubsubMessage message) {
-    return PubsubClient.OutgoingMessage.of(
-        com.google.pubsub.v1.PubsubMessage.newBuilder()
-            .setData(ByteString.copyFrom(message.getPayload()))
-            .putAllAttributes(message.getAttributeMap())
-            .build(),
-        DateTime.now().getMillis(),
-        null);
+    return new PubsubClient.OutgoingMessage(
+        message.getPayload(), message.getAttributeMap(), DateTime.now().getMillis(), null);
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java
index de4a715..f4f8b18 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsub;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.stream.Collectors.toList;
 import static org.apache.beam.sdk.io.gcp.pubsub.TestPubsub.createTopicName;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
@@ -29,7 +30,6 @@ import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
 import org.apache.beam.sdk.state.BagState;
@@ -251,7 +251,7 @@ public class TestPubsubSignal implements TestRule {
       try {
         signal = pubsub.pull(DateTime.now().getMillis(), signalSubscriptionPath, 1, false);
         pubsub.acknowledge(
-            signalSubscriptionPath, signal.stream().map(IncomingMessage::ackId).collect(toList()));
+            signalSubscriptionPath, signal.stream().map(m -> m.ackId).collect(toList()));
         break;
       } catch (StatusRuntimeException e) {
         if (!Status.DEADLINE_EXCEEDED.equals(e.getStatus())) {
@@ -271,7 +271,7 @@ public class TestPubsubSignal implements TestRule {
               signalSubscriptionPath, duration.getStandardSeconds()));
     }
 
-    return signal.get(0).message().getData().toStringUtf8();
+    return new String(signal.get(0).elementBytes, UTF_8);
   }
 
   private void sleep(long t) {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
index bc146ce..50f6548 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
@@ -67,7 +67,6 @@ public class GcpApiSurfaceTest {
             classesInPackage("com.google.cloud.bigtable.config"),
             classesInPackage("com.google.cloud.bigtable.data"),
             classesInPackage("com.google.spanner.v1"),
-            classesInPackage("com.google.pubsub.v1"),
             Matchers.equalTo(com.google.api.gax.rpc.ApiException.class),
             Matchers.<Class<?>>equalTo(com.google.api.gax.longrunning.OperationFuture.class),
             Matchers.<Class<?>>equalTo(com.google.api.gax.longrunning.OperationSnapshot.class),
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
index 4dd719b..7c53170 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
@@ -142,11 +142,11 @@ public class PubsubGrpcClientTest {
       List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
       assertEquals(1, acutalMessages.size());
       IncomingMessage actualMessage = acutalMessages.get(0);
-      assertEquals(ACK_ID, actualMessage.ackId());
-      assertEquals(DATA, actualMessage.message().getData().toStringUtf8());
-      assertEquals(RECORD_ID, actualMessage.recordId());
-      assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch());
-      assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch());
+      assertEquals(ACK_ID, actualMessage.ackId);
+      assertEquals(DATA, new String(actualMessage.elementBytes, StandardCharsets.UTF_8));
+      assertEquals(RECORD_ID, actualMessage.recordId);
+      assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
+      assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
       assertEquals(expectedRequest, Iterables.getOnlyElement(requestsReceived));
     } finally {
       server.shutdownNow();
@@ -187,13 +187,8 @@ public class PubsubGrpcClientTest {
         InProcessServerBuilder.forName(channelName).addService(publisherImplBase).build().start();
     try {
       OutgoingMessage actualMessage =
-          OutgoingMessage.of(
-              com.google.pubsub.v1.PubsubMessage.newBuilder()
-                  .setData(ByteString.copyFromUtf8(DATA))
-                  .putAllAttributes(ATTRIBUTES)
-                  .build(),
-              MESSAGE_TIME,
-              RECORD_ID);
+          new OutgoingMessage(
+              DATA.getBytes(StandardCharsets.UTF_8), ATTRIBUTES, MESSAGE_TIME, RECORD_ID);
       int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
       assertEquals(1, n);
       assertEquals(expectedRequest, Iterables.getOnlyElement(requestsReceived));
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 0dc910f..65b89a7 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -29,7 +29,6 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 
 import com.google.api.client.util.Clock;
-import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
@@ -392,10 +391,9 @@ public class PubsubIOTest {
                 })
             .map(
                 ba ->
-                    IncomingMessage.of(
-                        com.google.pubsub.v1.PubsubMessage.newBuilder()
-                            .setData(ByteString.copyFrom(ba))
-                            .build(),
+                    new IncomingMessage(
+                        ba,
+                        null,
                         1234L,
                         0,
                         UUID.randomUUID().toString(),
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
index aad9729..f7fc0f3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
@@ -34,7 +34,6 @@ import com.google.api.services.pubsub.model.PullResponse;
 import com.google.api.services.pubsub.model.ReceivedMessage;
 import com.google.api.services.pubsub.model.Subscription;
 import com.google.api.services.pubsub.model.Topic;
-import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
@@ -74,7 +73,6 @@ public class PubsubJsonClientTest {
   private static final String DATA = "testData";
   private static final String RECORD_ID = "testRecordId";
   private static final String ACK_ID = "testAckId";
-  private static final String ORDERING_KEY = "testOrderingKey";
 
   @Before
   public void setup() {
@@ -100,40 +98,7 @@ public class PubsubJsonClientTest {
             .setPublishTime(String.valueOf(PUB_TIME))
             .setAttributes(
                 ImmutableMap.of(
-                    TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME), ID_ATTRIBUTE, RECORD_ID))
-            .set("orderingKey", ORDERING_KEY);
-    ReceivedMessage expectedReceivedMessage =
-        new ReceivedMessage().setMessage(expectedPubsubMessage).setAckId(ACK_ID);
-    PullResponse expectedResponse =
-        new PullResponse().setReceivedMessages(ImmutableList.of(expectedReceivedMessage));
-    when((Object)
-            (mockPubsub
-                .projects()
-                .subscriptions()
-                .pull(expectedSubscription, expectedRequest)
-                .execute()))
-        .thenReturn(expectedResponse);
-    List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
-    assertEquals(1, acutalMessages.size());
-    IncomingMessage actualMessage = acutalMessages.get(0);
-    assertEquals(ACK_ID, actualMessage.ackId());
-    assertEquals(DATA, actualMessage.message().getData().toStringUtf8());
-    assertEquals(RECORD_ID, actualMessage.recordId());
-    assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch());
-    assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch());
-    assertEquals(ORDERING_KEY, actualMessage.message().getOrderingKey());
-  }
-
-  @Test
-  public void pullOneMessageEmptyAttributes() throws IOException {
-    client = new PubsubJsonClient(null, null, mockPubsub);
-    String expectedSubscription = SUBSCRIPTION.getPath();
-    PullRequest expectedRequest = new PullRequest().setReturnImmediately(true).setMaxMessages(10);
-    PubsubMessage expectedPubsubMessage =
-        new PubsubMessage()
-            .setMessageId(MESSAGE_ID)
-            .encodeData(DATA.getBytes(StandardCharsets.UTF_8))
-            .setPublishTime(String.valueOf(PUB_TIME));
+                    TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME), ID_ATTRIBUTE, RECORD_ID));
     ReceivedMessage expectedReceivedMessage =
         new ReceivedMessage().setMessage(expectedPubsubMessage).setAckId(ACK_ID);
     PullResponse expectedResponse =
@@ -148,10 +113,11 @@ public class PubsubJsonClientTest {
     List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
     assertEquals(1, acutalMessages.size());
     IncomingMessage actualMessage = acutalMessages.get(0);
-    assertEquals(ACK_ID, actualMessage.ackId());
-    assertEquals(DATA, actualMessage.message().getData().toStringUtf8());
-    assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch());
-    assertEquals(PUB_TIME, actualMessage.timestampMsSinceEpoch());
+    assertEquals(ACK_ID, actualMessage.ackId);
+    assertEquals(DATA, new String(actualMessage.elementBytes, StandardCharsets.UTF_8));
+    assertEquals(RECORD_ID, actualMessage.recordId);
+    assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
+    assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
   }
 
   @Test
@@ -180,7 +146,7 @@ public class PubsubJsonClientTest {
     List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
     assertEquals(1, acutalMessages.size());
     IncomingMessage actualMessage = acutalMessages.get(0);
-    assertArrayEquals(new byte[0], actualMessage.message().getData().toByteArray());
+    assertArrayEquals(new byte[0], actualMessage.elementBytes);
   }
 
   @Test
@@ -194,8 +160,7 @@ public class PubsubJsonClientTest {
                     .put(TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME))
                     .put(ID_ATTRIBUTE, RECORD_ID)
                     .put("k", "v")
-                    .build())
-            .set("orderingKey", ORDERING_KEY);
+                    .build());
     PublishRequest expectedRequest =
         new PublishRequest().setMessages(ImmutableList.of(expectedPubsubMessage));
     PublishResponse expectedResponse =
@@ -206,14 +171,7 @@ public class PubsubJsonClientTest {
     Map<String, String> attrs = new HashMap<>();
     attrs.put("k", "v");
     OutgoingMessage actualMessage =
-        OutgoingMessage.of(
-            com.google.pubsub.v1.PubsubMessage.newBuilder()
-                .setData(ByteString.copyFromUtf8(DATA))
-                .putAllAttributes(attrs)
-                .setOrderingKey(ORDERING_KEY)
-                .build(),
-            MESSAGE_TIME,
-            RECORD_ID);
+        new OutgoingMessage(DATA.getBytes(StandardCharsets.UTF_8), attrs, MESSAGE_TIME, RECORD_ID);
     int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
     assertEquals(1, n);
   }
@@ -237,12 +195,8 @@ public class PubsubJsonClientTest {
             (mockPubsub.projects().topics().publish(expectedTopic, expectedRequest).execute()))
         .thenReturn(expectedResponse);
     OutgoingMessage actualMessage =
-        OutgoingMessage.of(
-            com.google.pubsub.v1.PubsubMessage.newBuilder()
-                .setData(ByteString.copyFromUtf8(DATA))
-                .build(),
-            MESSAGE_TIME,
-            RECORD_ID);
+        new OutgoingMessage(
+            DATA.getBytes(StandardCharsets.UTF_8), ImmutableMap.of(), MESSAGE_TIME, RECORD_ID);
     int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
     assertEquals(1, n);
   }
@@ -268,13 +222,7 @@ public class PubsubJsonClientTest {
     Map<String, String> attrs = new HashMap<>();
     attrs.put("k", "v");
     OutgoingMessage actualMessage =
-        OutgoingMessage.of(
-            com.google.pubsub.v1.PubsubMessage.newBuilder()
-                .setData(ByteString.copyFromUtf8(DATA))
-                .putAllAttributes(attrs)
-                .build(),
-            MESSAGE_TIME,
-            RECORD_ID);
+        new OutgoingMessage(DATA.getBytes(StandardCharsets.UTF_8), attrs, MESSAGE_TIME, RECORD_ID);
     int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
     assertEquals(1, n);
   }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java
index 6b920e8..2b698f0 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java
@@ -20,9 +20,8 @@ package org.apache.beam.sdk.io.gcp.pubsub;
 import static org.junit.Assert.assertEquals;
 
 import com.google.api.client.util.Clock;
-import com.google.protobuf.ByteString;
-import com.google.pubsub.v1.PubsubMessage;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
@@ -55,8 +54,9 @@ public class PubsubTestClientTest {
     final AtomicLong now = new AtomicLong();
     Clock clock = now::get;
     IncomingMessage expectedIncomingMessage =
-        IncomingMessage.of(
-            PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(DATA)).build(),
+        new IncomingMessage(
+            DATA.getBytes(StandardCharsets.UTF_8),
+            null,
             MESSAGE_TIME,
             REQ_TIME,
             ACK_ID,
@@ -75,14 +75,7 @@ public class PubsubTestClientTest {
         client.advance();
         incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
         assertEquals(1, incomingMessages.size());
-        assertEquals(
-            IncomingMessage.of(
-                expectedIncomingMessage.message(),
-                expectedIncomingMessage.timestampMsSinceEpoch(),
-                now.get(),
-                expectedIncomingMessage.ackId(),
-                expectedIncomingMessage.recordId()),
-            incomingMessages.get(0));
+        assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0));
         now.addAndGet(10 * 1000);
         client.advance();
         // Extend ack
@@ -92,14 +85,7 @@ public class PubsubTestClientTest {
         client.advance();
         incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
         assertEquals(1, incomingMessages.size());
-        assertEquals(
-            IncomingMessage.of(
-                expectedIncomingMessage.message(),
-                expectedIncomingMessage.timestampMsSinceEpoch(),
-                now.get(),
-                expectedIncomingMessage.ackId(),
-                expectedIncomingMessage.recordId()),
-            incomingMessages.get(0));
+        assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0));
         // Extend ack
         client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
         // Ack
@@ -113,10 +99,7 @@ public class PubsubTestClientTest {
   @Test
   public void publishOneMessage() throws IOException {
     OutgoingMessage expectedOutgoingMessage =
-        OutgoingMessage.of(
-            PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(DATA)).build(),
-            MESSAGE_TIME,
-            MESSAGE_ID);
+        new OutgoingMessage(DATA.getBytes(StandardCharsets.UTF_8), null, MESSAGE_TIME, MESSAGE_ID);
     try (PubsubTestClientFactory factory =
         PubsubTestClient.createFactoryForPublish(
             TOPIC, Sets.newHashSet(expectedOutgoingMessage), ImmutableList.of())) {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
index f8cd86e..f588e05 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsub;
 
-import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
@@ -84,12 +83,8 @@ public class PubsubUnboundedSinkTest implements Serializable {
   @Test
   public void saneCoder() throws Exception {
     OutgoingMessage message =
-        OutgoingMessage.of(
-            com.google.pubsub.v1.PubsubMessage.newBuilder()
-                .setData(ByteString.copyFromUtf8(DATA))
-                .build(),
-            TIMESTAMP,
-            getRecordId(DATA));
+        new OutgoingMessage(
+            DATA.getBytes(StandardCharsets.UTF_8), ImmutableMap.of(), TIMESTAMP, getRecordId(DATA));
     CoderProperties.coderDecodeEncodeEqual(PubsubUnboundedSink.CODER, message);
     CoderProperties.coderSerializable(PubsubUnboundedSink.CODER);
   }
@@ -98,13 +93,8 @@ public class PubsubUnboundedSinkTest implements Serializable {
   public void sendOneMessage() throws IOException {
     List<OutgoingMessage> outgoing =
         ImmutableList.of(
-            OutgoingMessage.of(
-                com.google.pubsub.v1.PubsubMessage.newBuilder()
-                    .setData(ByteString.copyFromUtf8(DATA))
-                    .putAllAttributes(ATTRIBUTES)
-                    .build(),
-                TIMESTAMP,
-                getRecordId(DATA)));
+            new OutgoingMessage(
+                DATA.getBytes(StandardCharsets.UTF_8), ATTRIBUTES, TIMESTAMP, getRecordId(DATA)));
     int batchSize = 1;
     int batchBytes = 1;
     try (PubsubTestClientFactory factory =
@@ -131,10 +121,9 @@ public class PubsubUnboundedSinkTest implements Serializable {
   public void sendOneMessageWithoutAttributes() throws IOException {
     List<OutgoingMessage> outgoing =
         ImmutableList.of(
-            OutgoingMessage.of(
-                com.google.pubsub.v1.PubsubMessage.newBuilder()
-                    .setData(ByteString.copyFromUtf8(DATA))
-                    .build(),
+            new OutgoingMessage(
+                DATA.getBytes(StandardCharsets.UTF_8),
+                null /* attributes */,
                 TIMESTAMP,
                 getRecordId(DATA)));
     try (PubsubTestClientFactory factory =
@@ -168,10 +157,9 @@ public class PubsubUnboundedSinkTest implements Serializable {
     for (int i = 0; i < batchSize * 10; i++) {
       String str = String.valueOf(i);
       outgoing.add(
-          OutgoingMessage.of(
-              com.google.pubsub.v1.PubsubMessage.newBuilder()
-                  .setData(ByteString.copyFromUtf8(str))
-                  .build(),
+          new OutgoingMessage(
+              str.getBytes(StandardCharsets.UTF_8),
+              ImmutableMap.of(),
               TIMESTAMP,
               getRecordId(str)));
       data.add(str);
@@ -210,10 +198,9 @@ public class PubsubUnboundedSinkTest implements Serializable {
       }
       String str = sb.toString();
       outgoing.add(
-          OutgoingMessage.of(
-              com.google.pubsub.v1.PubsubMessage.newBuilder()
-                  .setData(ByteString.copyFromUtf8(str))
-                  .build(),
+          new OutgoingMessage(
+              str.getBytes(StandardCharsets.UTF_8),
+              ImmutableMap.of(),
               TIMESTAMP,
               getRecordId(str)));
       data.add(str);
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
index 43ecbdc..b2dacf0 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
@@ -31,7 +31,6 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import com.google.api.client.util.Clock;
-import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -101,14 +100,8 @@ public class PubsubUnboundedSourceTest {
   private void setupOneMessage() {
     setupOneMessage(
         ImmutableList.of(
-            IncomingMessage.of(
-                com.google.pubsub.v1.PubsubMessage.newBuilder()
-                    .setData(ByteString.copyFromUtf8(DATA))
-                    .build(),
-                TIMESTAMP,
-                0,
-                ACK_ID,
-                RECORD_ID)));
+            new IncomingMessage(
+                DATA.getBytes(StandardCharsets.UTF_8), null, TIMESTAMP, 0, ACK_ID, RECORD_ID)));
   }
 
   @After
@@ -226,14 +219,8 @@ public class PubsubUnboundedSourceTest {
       String data = String.format("data_%d", i);
       String ackid = String.format("ackid_%d", i);
       incoming.add(
-          IncomingMessage.of(
-              com.google.pubsub.v1.PubsubMessage.newBuilder()
-                  .setData(ByteString.copyFromUtf8(data))
-                  .build(),
-              TIMESTAMP,
-              0,
-              ackid,
-              RECORD_ID));
+          new IncomingMessage(
+              data.getBytes(StandardCharsets.UTF_8), null, TIMESTAMP, 0, ackid, RECORD_ID));
     }
     setupOneMessage(incoming);
     PubsubReader reader = primSource.createReader(p.getOptions(), null);
@@ -292,10 +279,9 @@ public class PubsubUnboundedSourceTest {
       String recid = String.format("recordid_%d", messageNum);
       String ackId = String.format("ackid_%d", messageNum);
       incoming.add(
-          IncomingMessage.of(
-              com.google.pubsub.v1.PubsubMessage.newBuilder()
-                  .setData(ByteString.copyFromUtf8(data))
-                  .build(),
+          new IncomingMessage(
+              data.getBytes(StandardCharsets.UTF_8),
+              null,
               messageNumToTimestamp(messageNum),
               0,
               ackId,