You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/01/10 19:22:08 UTC

[beam] 01/01: Revert "[BEAM-8932] [BEAM-9036] Revert reverted commit to use PubsubMessage as the canonical type in beam client (#10474)"

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch revert-10474-master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 1b35c617428f76fb9b3723d00829be89e8f70abe
Author: Boyuan Zhang <36...@users.noreply.github.com>
AuthorDate: Fri Jan 10 11:21:58 2020 -0800

    Revert "[BEAM-8932] [BEAM-9036] Revert reverted commit to use PubsubMessage as the canonical type in beam client (#10474)"
    
    This reverts commit 784d18b7ac89f87dd7fbf2861ee877f5b6070276.
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   4 +-
 .../beam/sdk/io/gcp/pubsub/PubsubClient.java       | 139 ++++++++++++++++-----
 .../beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java   |  29 +++--
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java    |   9 +-
 .../beam/sdk/io/gcp/pubsub/PubsubJsonClient.java   |  38 ++----
 .../beam/sdk/io/gcp/pubsub/PubsubTestClient.java   |  11 +-
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java     |  23 ++--
 .../sdk/io/gcp/pubsub/PubsubUnboundedSource.java   |  33 +++--
 .../apache/beam/sdk/io/gcp/pubsub/TestPubsub.java  |  20 +--
 .../beam/sdk/io/gcp/pubsub/TestPubsubSignal.java   |   6 +-
 .../apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java  |   1 -
 .../sdk/io/gcp/pubsub/PubsubGrpcClientTest.java    |  19 ++-
 .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java       |   8 +-
 .../sdk/io/gcp/pubsub/PubsubJsonClientTest.java    |  76 ++---------
 .../sdk/io/gcp/pubsub/PubsubTestClientTest.java    |  31 ++---
 .../sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java |  39 ++----
 .../io/gcp/pubsub/PubsubUnboundedSourceTest.java   |  28 ++---
 17 files changed, 227 insertions(+), 287 deletions(-)

diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 7e1cf0f..c2d1464 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -365,7 +365,7 @@ class BeamModulePlugin implements Plugin<Project> {
     def cassandra_driver_version = "3.8.0"
     def classgraph_version = "4.8.56"
     def generated_grpc_beta_version = "0.44.0"
-    def generated_grpc_ga_version = "1.83.0"
+    def generated_grpc_ga_version = "1.43.0"
     def generated_grpc_dc_beta_version = "0.27.0-alpha"
     def google_auth_version = "0.19.0"
     def google_clients_version = "1.28.0"
@@ -444,7 +444,7 @@ class BeamModulePlugin implements Plugin<Project> {
         google_api_services_clouddebugger           : "com.google.apis:google-api-services-clouddebugger:v2-rev20181114-$google_clients_version",
         google_api_services_cloudresourcemanager    : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20181015-$google_clients_version",
         google_api_services_dataflow                : "com.google.apis:google-api-services-dataflow:v1b3-rev20190927-$google_clients_version",
-        google_api_services_pubsub                  : "com.google.apis:google-api-services-pubsub:v1-rev20191111-$google_clients_version",
+        google_api_services_pubsub                  : "com.google.apis:google-api-services-pubsub:v1-rev20181213-$google_clients_version",
         google_api_services_storage                 : "com.google.apis:google-api-services-storage:v1-rev20181109-$google_clients_version",
         google_auth_library_credentials             : "com.google.auth:google-auth-library-credentials:$google_auth_version",
         google_auth_library_oauth2_http             : "com.google.auth:google-auth-library-oauth2-http:$google_auth_version",
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
index 6f0f54d..07d6da6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
@@ -21,12 +21,10 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
 import com.google.api.client.util.DateTime;
-import com.google.auto.value.AutoValue;
-import com.google.protobuf.ByteString;
-import com.google.pubsub.v1.PubsubMessage;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
@@ -300,37 +298,59 @@ public abstract class PubsubClient implements Closeable {
    * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. Java
    * serialization is never used for non-test clients.
    */
-  @AutoValue
-  public abstract static class OutgoingMessage implements Serializable {
+  public static class OutgoingMessage implements Serializable {
+    /** Underlying (encoded) element. */
+    public final byte[] elementBytes;
 
-    /** Underlying Message. May not have publish timestamp set. */
-    public abstract PubsubMessage message();
+    public final Map<String, String> attributes;
 
     /** Timestamp for element (ms since epoch). */
-    public abstract long timestampMsSinceEpoch();
+    public final long timestampMsSinceEpoch;
 
     /**
      * If using an id attribute, the record id to associate with this record's metadata so the
      * receiver can reject duplicates. Otherwise {@literal null}.
      */
-    @Nullable
-    public abstract String recordId();
+    @Nullable public final String recordId;
 
-    public static OutgoingMessage of(
-        PubsubMessage message, long timestampMsSinceEpoch, @Nullable String recordId) {
-      return new AutoValue_PubsubClient_OutgoingMessage(message, timestampMsSinceEpoch, recordId);
-    }
-
-    public static OutgoingMessage of(
-        org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage message,
+    public OutgoingMessage(
+        byte[] elementBytes,
+        Map<String, String> attributes,
         long timestampMsSinceEpoch,
         @Nullable String recordId) {
-      PubsubMessage.Builder builder =
-          PubsubMessage.newBuilder().setData(ByteString.copyFrom(message.getPayload()));
-      if (message.getAttributeMap() != null) {
-        builder.putAllAttributes(message.getAttributeMap());
+      this.elementBytes = elementBytes;
+      this.attributes = attributes;
+      this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+      this.recordId = recordId;
+    }
+
+    @Override
+    public String toString() {
+      return String.format(
+          "OutgoingMessage(%db, %dms)", elementBytes.length, timestampMsSinceEpoch);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
       }
-      return of(builder.build(), timestampMsSinceEpoch, recordId);
+
+      OutgoingMessage that = (OutgoingMessage) o;
+
+      return timestampMsSinceEpoch == that.timestampMsSinceEpoch
+          && Arrays.equals(elementBytes, that.elementBytes)
+          && Objects.equal(attributes, that.attributes)
+          && Objects.equal(recordId, that.recordId);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(
+          Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch, recordId);
     }
   }
 
@@ -340,35 +360,86 @@ public abstract class PubsubClient implements Closeable {
    * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. Java
    * serialization is never used for non-test clients.
    */
-  @AutoValue
-  abstract static class IncomingMessage implements Serializable {
+  static class IncomingMessage implements Serializable {
+    /** Underlying (encoded) element. */
+    public final byte[] elementBytes;
 
-    /** Underlying Message. */
-    public abstract PubsubMessage message();
+    public Map<String, String> attributes;
 
     /**
      * Timestamp for element (ms since epoch). Either Pubsub's processing time, or the custom
      * timestamp associated with the message.
      */
-    public abstract long timestampMsSinceEpoch();
+    public final long timestampMsSinceEpoch;
 
     /** Timestamp (in system time) at which we requested the message (ms since epoch). */
-    public abstract long requestTimeMsSinceEpoch();
+    public final long requestTimeMsSinceEpoch;
 
     /** Id to pass back to Pubsub to acknowledge receipt of this message. */
-    public abstract String ackId();
+    public final String ackId;
 
     /** Id to pass to the runner to distinguish this message from all others. */
-    public abstract String recordId();
+    public final String recordId;
 
-    public static IncomingMessage of(
-        PubsubMessage message,
+    public IncomingMessage(
+        byte[] elementBytes,
+        Map<String, String> attributes,
         long timestampMsSinceEpoch,
         long requestTimeMsSinceEpoch,
         String ackId,
         String recordId) {
-      return new AutoValue_PubsubClient_IncomingMessage(
-          message, timestampMsSinceEpoch, requestTimeMsSinceEpoch, ackId, recordId);
+      this.elementBytes = elementBytes;
+      this.attributes = attributes;
+      this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+      this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
+      this.ackId = ackId;
+      this.recordId = recordId;
+    }
+
+    public IncomingMessage withRequestTime(long requestTimeMsSinceEpoch) {
+      return new IncomingMessage(
+          elementBytes,
+          attributes,
+          timestampMsSinceEpoch,
+          requestTimeMsSinceEpoch,
+          ackId,
+          recordId);
+    }
+
+    @Override
+    public String toString() {
+      return String.format(
+          "IncomingMessage(%db, %dms)", elementBytes.length, timestampMsSinceEpoch);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      IncomingMessage that = (IncomingMessage) o;
+
+      return timestampMsSinceEpoch == that.timestampMsSinceEpoch
+          && requestTimeMsSinceEpoch == that.requestTimeMsSinceEpoch
+          && ackId.equals(that.ackId)
+          && recordId.equals(that.recordId)
+          && Arrays.equals(elementBytes, that.elementBytes)
+          && Objects.equal(attributes, that.attributes);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(
+          Arrays.hashCode(elementBytes),
+          attributes,
+          timestampMsSinceEpoch,
+          requestTimeMsSinceEpoch,
+          ackId,
+          recordId);
     }
   }
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
index a3b6b8d..ae3fa02 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.pubsub;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
 import com.google.auth.Credentials;
+import com.google.protobuf.ByteString;
 import com.google.protobuf.Timestamp;
 import com.google.pubsub.v1.AcknowledgeRequest;
 import com.google.pubsub.v1.DeleteSubscriptionRequest;
@@ -212,15 +213,21 @@ public class PubsubGrpcClient extends PubsubClient {
   public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException {
     PublishRequest.Builder request = PublishRequest.newBuilder().setTopic(topic.getPath());
     for (OutgoingMessage outgoingMessage : outgoingMessages) {
-      PubsubMessage.Builder message = outgoingMessage.message().toBuilder();
+      PubsubMessage.Builder message =
+          PubsubMessage.newBuilder().setData(ByteString.copyFrom(outgoingMessage.elementBytes));
+
+      if (outgoingMessage.attributes != null) {
+        message.putAllAttributes(outgoingMessage.attributes);
+      }
 
       if (timestampAttribute != null) {
-        message.putAttributes(
-            timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch()));
+        message
+            .getMutableAttributes()
+            .put(timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
       }
 
-      if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId())) {
-        message.putAttributes(idAttribute, outgoingMessage.recordId());
+      if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+        message.getMutableAttributes().put(idAttribute, outgoingMessage.recordId);
       }
 
       request.addMessages(message);
@@ -252,6 +259,9 @@ public class PubsubGrpcClient extends PubsubClient {
       PubsubMessage pubsubMessage = message.getMessage();
       @Nullable Map<String, String> attributes = pubsubMessage.getAttributes();
 
+      // Payload.
+      byte[] elementBytes = pubsubMessage.getData().toByteArray();
+
       // Timestamp.
       String pubsubTimestampString = null;
       Timestamp timestampProto = pubsubMessage.getPublishTime();
@@ -277,8 +287,13 @@ public class PubsubGrpcClient extends PubsubClient {
       }
 
       incomingMessages.add(
-          IncomingMessage.of(
-              pubsubMessage, timestampMsSinceEpoch, requestTimeMsSinceEpoch, ackId, recordId));
+          new IncomingMessage(
+              elementBytes,
+              attributes,
+              timestampMsSinceEpoch,
+              requestTimeMsSinceEpoch,
+              ackId,
+              recordId));
     }
     return incomingMessages;
   }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 5f6d044..da5266f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -1303,14 +1303,7 @@ public class PubsubIO {
         }
 
         // NOTE: The record id is always null.
-        output.add(
-            OutgoingMessage.of(
-                com.google.pubsub.v1.PubsubMessage.newBuilder()
-                    .setData(ByteString.copyFrom(payload))
-                    .putAllAttributes(attributes)
-                    .build(),
-                c.timestamp().getMillis(),
-                null));
+        output.add(new OutgoingMessage(payload, attributes, c.timestamp().getMillis(), null));
         currentOutputBytes += payload.length;
       }
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
index 1ae5a55..136b1d2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
@@ -39,10 +39,8 @@ import com.google.api.services.pubsub.model.Topic;
 import com.google.auth.Credentials;
 import com.google.auth.http.HttpCredentialsAdapter;
 import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
-import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -125,12 +123,8 @@ public class PubsubJsonClient extends PubsubClient {
   public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException {
     List<PubsubMessage> pubsubMessages = new ArrayList<>(outgoingMessages.size());
     for (OutgoingMessage outgoingMessage : outgoingMessages) {
-      PubsubMessage pubsubMessage =
-          new PubsubMessage().encodeData(outgoingMessage.message().getData().toByteArray());
+      PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes);
       pubsubMessage.setAttributes(getMessageAttributes(outgoingMessage));
-      if (!outgoingMessage.message().getOrderingKey().isEmpty()) {
-        pubsubMessage.put("orderingKey", outgoingMessage.message().getOrderingKey());
-      }
       pubsubMessages.add(pubsubMessage);
     }
     PublishRequest request = new PublishRequest().setMessages(pubsubMessages);
@@ -141,16 +135,16 @@ public class PubsubJsonClient extends PubsubClient {
 
   private Map<String, String> getMessageAttributes(OutgoingMessage outgoingMessage) {
     Map<String, String> attributes = null;
-    if (outgoingMessage.message().getAttributesMap() == null) {
+    if (outgoingMessage.attributes == null) {
       attributes = new TreeMap<>();
     } else {
-      attributes = new TreeMap<>(outgoingMessage.message().getAttributesMap());
+      attributes = new TreeMap<>(outgoingMessage.attributes);
     }
     if (timestampAttribute != null) {
-      attributes.put(timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch()));
+      attributes.put(timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
     }
-    if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId())) {
-      attributes.put(idAttribute, outgoingMessage.recordId());
+    if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+      attributes.put(idAttribute, outgoingMessage.recordId);
     }
     return attributes;
   }
@@ -172,12 +166,7 @@ public class PubsubJsonClient extends PubsubClient {
     List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessages().size());
     for (ReceivedMessage message : response.getReceivedMessages()) {
       PubsubMessage pubsubMessage = message.getMessage();
-      Map<String, String> attributes;
-      if (pubsubMessage.getAttributes() != null) {
-        attributes = pubsubMessage.getAttributes();
-      } else {
-        attributes = new HashMap<>();
-      }
+      @Nullable Map<String, String> attributes = pubsubMessage.getAttributes();
 
       // Payload.
       byte[] elementBytes = pubsubMessage.getData() == null ? null : pubsubMessage.decodeData();
@@ -195,7 +184,7 @@ public class PubsubJsonClient extends PubsubClient {
 
       // Record id, if any.
       @Nullable String recordId = null;
-      if (idAttribute != null) {
+      if (idAttribute != null && attributes != null) {
         recordId = attributes.get(idAttribute);
       }
       if (Strings.isNullOrEmpty(recordId)) {
@@ -203,15 +192,10 @@ public class PubsubJsonClient extends PubsubClient {
         recordId = pubsubMessage.getMessageId();
       }
 
-      com.google.pubsub.v1.PubsubMessage.Builder protoMessage =
-          com.google.pubsub.v1.PubsubMessage.newBuilder();
-      protoMessage.setData(ByteString.copyFrom(elementBytes));
-      protoMessage.putAllAttributes(attributes);
-      protoMessage.setOrderingKey(
-          (String) pubsubMessage.getUnknownKeys().getOrDefault("orderingKey", ""));
       incomingMessages.add(
-          IncomingMessage.of(
-              protoMessage.build(),
+          new IncomingMessage(
+              elementBytes,
+              attributes,
               timestampMsSinceEpoch,
               requestTimeMsSinceEpoch,
               ackId,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
index c3b915d..6b20b56 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
@@ -309,17 +309,12 @@ public class PubsubTestClient extends PubsubClient implements Serializable {
         IncomingMessage incomingMessage = pendItr.next();
         pendItr.remove();
         IncomingMessage incomingMessageWithRequestTime =
-            IncomingMessage.of(
-                incomingMessage.message(),
-                incomingMessage.timestampMsSinceEpoch(),
-                requestTimeMsSinceEpoch,
-                incomingMessage.ackId(),
-                incomingMessage.recordId());
+            incomingMessage.withRequestTime(requestTimeMsSinceEpoch);
         incomingMessages.add(incomingMessageWithRequestTime);
         STATE.pendingAckIncomingMessages.put(
-            incomingMessageWithRequestTime.ackId(), incomingMessageWithRequestTime);
+            incomingMessageWithRequestTime.ackId, incomingMessageWithRequestTime);
         STATE.ackDeadline.put(
-            incomingMessageWithRequestTime.ackId(),
+            incomingMessageWithRequestTime.ackId,
             requestTimeMsSinceEpoch + STATE.ackTimeoutSec * 1000);
         if (incomingMessages.size() >= batchSize) {
           break;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index 8be8c56..1258d0b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -37,7 +38,6 @@ import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
@@ -101,18 +101,19 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
     @Override
     public void encode(OutgoingMessage value, OutputStream outStream)
         throws CoderException, IOException {
-      ProtoCoder.of(com.google.pubsub.v1.PubsubMessage.class).encode(value.message(), outStream);
-      BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch(), outStream);
-      RECORD_ID_CODER.encode(value.recordId(), outStream);
+      ByteArrayCoder.of().encode(value.elementBytes, outStream);
+      ATTRIBUTES_CODER.encode(value.attributes, outStream);
+      BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream);
+      RECORD_ID_CODER.encode(value.recordId, outStream);
     }
 
     @Override
     public OutgoingMessage decode(InputStream inStream) throws CoderException, IOException {
-      com.google.pubsub.v1.PubsubMessage message =
-          ProtoCoder.of(com.google.pubsub.v1.PubsubMessage.class).decode(inStream);
+      byte[] elementBytes = ByteArrayCoder.of().decode(inStream);
+      Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream);
       long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream);
       @Nullable String recordId = RECORD_ID_CODER.decode(inStream);
-      return OutgoingMessage.of(message, timestampMsSinceEpoch, recordId);
+      return new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch, recordId);
     }
   }
 
@@ -153,6 +154,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
       elementCounter.inc();
       PubsubMessage message = c.element();
       byte[] elementBytes = message.getPayload();
+      Map<String, String> attributes = message.getAttributeMap();
 
       long timestampMsSinceEpoch = c.timestamp().getMillis();
       @Nullable String recordId = null;
@@ -173,7 +175,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
       c.output(
           KV.of(
               ThreadLocalRandom.current().nextInt(numShards),
-              OutgoingMessage.of(message, timestampMsSinceEpoch, recordId)));
+              new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch, recordId)));
     }
 
     @Override
@@ -244,8 +246,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
       List<OutgoingMessage> pubsubMessages = new ArrayList<>(publishBatchSize);
       int bytes = 0;
       for (OutgoingMessage message : c.element().getValue()) {
-        if (!pubsubMessages.isEmpty()
-            && bytes + message.message().getData().size() > publishBatchBytes) {
+        if (!pubsubMessages.isEmpty() && bytes + message.elementBytes.length > publishBatchBytes) {
           // Break large (in bytes) batches into smaller.
           // (We've already broken by batch size using the trigger below, though that may
           // run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider that ok since
@@ -256,7 +257,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
           bytes = 0;
         }
         pubsubMessages.add(message);
-        bytes += message.message().getData().size();
+        bytes += message.elementBytes.length;
       }
       if (!pubsubMessages.isEmpty()) {
         // BLOCKS until published.
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index 230161c..d8abfe1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -727,18 +727,18 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
       // Capture the received messages.
       for (PubsubClient.IncomingMessage incomingMessage : receivedMessages) {
         notYetRead.add(incomingMessage);
-        notYetReadBytes += incomingMessage.message().getData().size();
+        notYetReadBytes += incomingMessage.elementBytes.length;
         inFlight.put(
-            incomingMessage.ackId(),
+            incomingMessage.ackId,
             new InFlightState(requestTimeMsSinceEpoch, deadlineMsSinceEpoch));
         numReceived++;
         numReceivedRecently.add(requestTimeMsSinceEpoch, 1L);
         minReceivedTimestampMsSinceEpoch.add(
-            requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch());
+            requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch);
         maxReceivedTimestampMsSinceEpoch.add(
-            requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch());
+            requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch);
         minUnreadTimestampMsSinceEpoch.add(
-            requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch());
+            requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch);
       }
     }
 
@@ -837,7 +837,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
 
       if (current != null) {
         // Current is consumed. It can no longer contribute to holding back the watermark.
-        minUnreadTimestampMsSinceEpoch.remove(current.requestTimeMsSinceEpoch());
+        minUnreadTimestampMsSinceEpoch.remove(current.requestTimeMsSinceEpoch);
         current = null;
       }
 
@@ -864,18 +864,18 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
         // Try again later.
         return false;
       }
-      notYetReadBytes -= current.message().getData().size();
+      notYetReadBytes -= current.elementBytes.length;
       checkState(notYetReadBytes >= 0);
       long nowMsSinceEpoch = now();
-      numReadBytes.add(nowMsSinceEpoch, current.message().getData().size());
-      minReadTimestampMsSinceEpoch.add(nowMsSinceEpoch, current.timestampMsSinceEpoch());
-      if (current.timestampMsSinceEpoch() < lastWatermarkMsSinceEpoch) {
+      numReadBytes.add(nowMsSinceEpoch, current.elementBytes.length);
+      minReadTimestampMsSinceEpoch.add(nowMsSinceEpoch, current.timestampMsSinceEpoch);
+      if (current.timestampMsSinceEpoch < lastWatermarkMsSinceEpoch) {
         numLateMessages.add(nowMsSinceEpoch, 1L);
       }
 
       // Current message can be considered 'read' and will be persisted by the next
       // checkpoint. So it is now safe to ACK back to Pubsub.
-      safeToAckIds.add(current.ackId());
+      safeToAckIds.add(current.ackId);
       return true;
     }
 
@@ -884,10 +884,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
       if (current == null) {
         throw new NoSuchElementException();
       }
-      return new PubsubMessage(
-          current.message().getData().toByteArray(),
-          current.message().getAttributesMap(),
-          current.recordId());
+      return new PubsubMessage(current.elementBytes, current.attributes, current.recordId);
     }
 
     @Override
@@ -895,7 +892,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
       if (current == null) {
         throw new NoSuchElementException();
       }
-      return new Instant(current.timestampMsSinceEpoch());
+      return new Instant(current.timestampMsSinceEpoch);
     }
 
     @Override
@@ -903,7 +900,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
       if (current == null) {
         throw new NoSuchElementException();
       }
-      return current.recordId().getBytes(StandardCharsets.UTF_8);
+      return current.recordId.getBytes(StandardCharsets.UTF_8);
     }
 
     /**
@@ -987,7 +984,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
       List<String> snapshotSafeToAckIds = Lists.newArrayList(safeToAckIds);
       List<String> snapshotNotYetReadIds = new ArrayList<>(notYetRead.size());
       for (PubsubClient.IncomingMessage incomingMessage : notYetRead) {
-        snapshotNotYetReadIds.add(incomingMessage.ackId());
+        snapshotNotYetReadIds.add(incomingMessage.ackId);
       }
       if (outer.subscriptionPath == null) {
         // need to include the subscription in case we resume, as it's not stored in the source.
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
index e1e8711..1e75d43 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
@@ -22,14 +22,12 @@ import static org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.projectPathFromPath
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 
-import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
@@ -207,16 +205,11 @@ public class TestPubsub implements TestRule {
     if (!messages.isEmpty()) {
       pubsub.acknowledge(
           subscriptionPath,
-          messages.stream().map(IncomingMessage::ackId).collect(ImmutableList.toImmutableList()));
+          messages.stream().map(msg -> msg.ackId).collect(ImmutableList.toImmutableList()));
     }
 
     return messages.stream()
-        .map(
-            msg ->
-                new PubsubMessage(
-                    msg.message().getData().toByteArray(),
-                    msg.message().getAttributesMap(),
-                    msg.recordId()))
+        .map(msg -> new PubsubMessage(msg.elementBytes, msg.attributes, msg.recordId))
         .collect(ImmutableList.toImmutableList());
   }
 
@@ -299,12 +292,7 @@ public class TestPubsub implements TestRule {
   }
 
   private PubsubClient.OutgoingMessage toOutgoingMessage(PubsubMessage message) {
-    return PubsubClient.OutgoingMessage.of(
-        com.google.pubsub.v1.PubsubMessage.newBuilder()
-            .setData(ByteString.copyFrom(message.getPayload()))
-            .putAllAttributes(message.getAttributeMap())
-            .build(),
-        DateTime.now().getMillis(),
-        null);
+    return new PubsubClient.OutgoingMessage(
+        message.getPayload(), message.getAttributeMap(), DateTime.now().getMillis(), null);
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java
index de4a715..f4f8b18 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsub;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.stream.Collectors.toList;
 import static org.apache.beam.sdk.io.gcp.pubsub.TestPubsub.createTopicName;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
@@ -29,7 +30,6 @@ import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
 import org.apache.beam.sdk.state.BagState;
@@ -251,7 +251,7 @@ public class TestPubsubSignal implements TestRule {
       try {
         signal = pubsub.pull(DateTime.now().getMillis(), signalSubscriptionPath, 1, false);
         pubsub.acknowledge(
-            signalSubscriptionPath, signal.stream().map(IncomingMessage::ackId).collect(toList()));
+            signalSubscriptionPath, signal.stream().map(m -> m.ackId).collect(toList()));
         break;
       } catch (StatusRuntimeException e) {
         if (!Status.DEADLINE_EXCEEDED.equals(e.getStatus())) {
@@ -271,7 +271,7 @@ public class TestPubsubSignal implements TestRule {
               signalSubscriptionPath, duration.getStandardSeconds()));
     }
 
-    return signal.get(0).message().getData().toStringUtf8();
+    return new String(signal.get(0).elementBytes, UTF_8);
   }
 
   private void sleep(long t) {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
index bc146ce..50f6548 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
@@ -67,7 +67,6 @@ public class GcpApiSurfaceTest {
             classesInPackage("com.google.cloud.bigtable.config"),
             classesInPackage("com.google.cloud.bigtable.data"),
             classesInPackage("com.google.spanner.v1"),
-            classesInPackage("com.google.pubsub.v1"),
             Matchers.equalTo(com.google.api.gax.rpc.ApiException.class),
             Matchers.<Class<?>>equalTo(com.google.api.gax.longrunning.OperationFuture.class),
             Matchers.<Class<?>>equalTo(com.google.api.gax.longrunning.OperationSnapshot.class),
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
index 4dd719b..7c53170 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
@@ -142,11 +142,11 @@ public class PubsubGrpcClientTest {
       List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
       assertEquals(1, acutalMessages.size());
       IncomingMessage actualMessage = acutalMessages.get(0);
-      assertEquals(ACK_ID, actualMessage.ackId());
-      assertEquals(DATA, actualMessage.message().getData().toStringUtf8());
-      assertEquals(RECORD_ID, actualMessage.recordId());
-      assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch());
-      assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch());
+      assertEquals(ACK_ID, actualMessage.ackId);
+      assertEquals(DATA, new String(actualMessage.elementBytes, StandardCharsets.UTF_8));
+      assertEquals(RECORD_ID, actualMessage.recordId);
+      assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
+      assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
       assertEquals(expectedRequest, Iterables.getOnlyElement(requestsReceived));
     } finally {
       server.shutdownNow();
@@ -187,13 +187,8 @@ public class PubsubGrpcClientTest {
         InProcessServerBuilder.forName(channelName).addService(publisherImplBase).build().start();
     try {
       OutgoingMessage actualMessage =
-          OutgoingMessage.of(
-              com.google.pubsub.v1.PubsubMessage.newBuilder()
-                  .setData(ByteString.copyFromUtf8(DATA))
-                  .putAllAttributes(ATTRIBUTES)
-                  .build(),
-              MESSAGE_TIME,
-              RECORD_ID);
+          new OutgoingMessage(
+              DATA.getBytes(StandardCharsets.UTF_8), ATTRIBUTES, MESSAGE_TIME, RECORD_ID);
       int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
       assertEquals(1, n);
       assertEquals(expectedRequest, Iterables.getOnlyElement(requestsReceived));
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 0dc910f..65b89a7 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -29,7 +29,6 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 
 import com.google.api.client.util.Clock;
-import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
@@ -392,10 +391,9 @@ public class PubsubIOTest {
                 })
             .map(
                 ba ->
-                    IncomingMessage.of(
-                        com.google.pubsub.v1.PubsubMessage.newBuilder()
-                            .setData(ByteString.copyFrom(ba))
-                            .build(),
+                    new IncomingMessage(
+                        ba,
+                        null,
                         1234L,
                         0,
                         UUID.randomUUID().toString(),
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
index aad9729..f7fc0f3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
@@ -34,7 +34,6 @@ import com.google.api.services.pubsub.model.PullResponse;
 import com.google.api.services.pubsub.model.ReceivedMessage;
 import com.google.api.services.pubsub.model.Subscription;
 import com.google.api.services.pubsub.model.Topic;
-import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
@@ -74,7 +73,6 @@ public class PubsubJsonClientTest {
   private static final String DATA = "testData";
   private static final String RECORD_ID = "testRecordId";
   private static final String ACK_ID = "testAckId";
-  private static final String ORDERING_KEY = "testOrderingKey";
 
   @Before
   public void setup() {
@@ -100,40 +98,7 @@ public class PubsubJsonClientTest {
             .setPublishTime(String.valueOf(PUB_TIME))
             .setAttributes(
                 ImmutableMap.of(
-                    TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME), ID_ATTRIBUTE, RECORD_ID))
-            .set("orderingKey", ORDERING_KEY);
-    ReceivedMessage expectedReceivedMessage =
-        new ReceivedMessage().setMessage(expectedPubsubMessage).setAckId(ACK_ID);
-    PullResponse expectedResponse =
-        new PullResponse().setReceivedMessages(ImmutableList.of(expectedReceivedMessage));
-    when((Object)
-            (mockPubsub
-                .projects()
-                .subscriptions()
-                .pull(expectedSubscription, expectedRequest)
-                .execute()))
-        .thenReturn(expectedResponse);
-    List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
-    assertEquals(1, acutalMessages.size());
-    IncomingMessage actualMessage = acutalMessages.get(0);
-    assertEquals(ACK_ID, actualMessage.ackId());
-    assertEquals(DATA, actualMessage.message().getData().toStringUtf8());
-    assertEquals(RECORD_ID, actualMessage.recordId());
-    assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch());
-    assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch());
-    assertEquals(ORDERING_KEY, actualMessage.message().getOrderingKey());
-  }
-
-  @Test
-  public void pullOneMessageEmptyAttributes() throws IOException {
-    client = new PubsubJsonClient(null, null, mockPubsub);
-    String expectedSubscription = SUBSCRIPTION.getPath();
-    PullRequest expectedRequest = new PullRequest().setReturnImmediately(true).setMaxMessages(10);
-    PubsubMessage expectedPubsubMessage =
-        new PubsubMessage()
-            .setMessageId(MESSAGE_ID)
-            .encodeData(DATA.getBytes(StandardCharsets.UTF_8))
-            .setPublishTime(String.valueOf(PUB_TIME));
+                    TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME), ID_ATTRIBUTE, RECORD_ID));
     ReceivedMessage expectedReceivedMessage =
         new ReceivedMessage().setMessage(expectedPubsubMessage).setAckId(ACK_ID);
     PullResponse expectedResponse =
@@ -148,10 +113,11 @@ public class PubsubJsonClientTest {
     List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
     assertEquals(1, acutalMessages.size());
     IncomingMessage actualMessage = acutalMessages.get(0);
-    assertEquals(ACK_ID, actualMessage.ackId());
-    assertEquals(DATA, actualMessage.message().getData().toStringUtf8());
-    assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch());
-    assertEquals(PUB_TIME, actualMessage.timestampMsSinceEpoch());
+    assertEquals(ACK_ID, actualMessage.ackId);
+    assertEquals(DATA, new String(actualMessage.elementBytes, StandardCharsets.UTF_8));
+    assertEquals(RECORD_ID, actualMessage.recordId);
+    assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
+    assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
   }
 
   @Test
@@ -180,7 +146,7 @@ public class PubsubJsonClientTest {
     List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
     assertEquals(1, acutalMessages.size());
     IncomingMessage actualMessage = acutalMessages.get(0);
-    assertArrayEquals(new byte[0], actualMessage.message().getData().toByteArray());
+    assertArrayEquals(new byte[0], actualMessage.elementBytes);
   }
 
   @Test
@@ -194,8 +160,7 @@ public class PubsubJsonClientTest {
                     .put(TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME))
                     .put(ID_ATTRIBUTE, RECORD_ID)
                     .put("k", "v")
-                    .build())
-            .set("orderingKey", ORDERING_KEY);
+                    .build());
     PublishRequest expectedRequest =
         new PublishRequest().setMessages(ImmutableList.of(expectedPubsubMessage));
     PublishResponse expectedResponse =
@@ -206,14 +171,7 @@ public class PubsubJsonClientTest {
     Map<String, String> attrs = new HashMap<>();
     attrs.put("k", "v");
     OutgoingMessage actualMessage =
-        OutgoingMessage.of(
-            com.google.pubsub.v1.PubsubMessage.newBuilder()
-                .setData(ByteString.copyFromUtf8(DATA))
-                .putAllAttributes(attrs)
-                .setOrderingKey(ORDERING_KEY)
-                .build(),
-            MESSAGE_TIME,
-            RECORD_ID);
+        new OutgoingMessage(DATA.getBytes(StandardCharsets.UTF_8), attrs, MESSAGE_TIME, RECORD_ID);
     int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
     assertEquals(1, n);
   }
@@ -237,12 +195,8 @@ public class PubsubJsonClientTest {
             (mockPubsub.projects().topics().publish(expectedTopic, expectedRequest).execute()))
         .thenReturn(expectedResponse);
     OutgoingMessage actualMessage =
-        OutgoingMessage.of(
-            com.google.pubsub.v1.PubsubMessage.newBuilder()
-                .setData(ByteString.copyFromUtf8(DATA))
-                .build(),
-            MESSAGE_TIME,
-            RECORD_ID);
+        new OutgoingMessage(
+            DATA.getBytes(StandardCharsets.UTF_8), ImmutableMap.of(), MESSAGE_TIME, RECORD_ID);
     int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
     assertEquals(1, n);
   }
@@ -268,13 +222,7 @@ public class PubsubJsonClientTest {
     Map<String, String> attrs = new HashMap<>();
     attrs.put("k", "v");
     OutgoingMessage actualMessage =
-        OutgoingMessage.of(
-            com.google.pubsub.v1.PubsubMessage.newBuilder()
-                .setData(ByteString.copyFromUtf8(DATA))
-                .putAllAttributes(attrs)
-                .build(),
-            MESSAGE_TIME,
-            RECORD_ID);
+        new OutgoingMessage(DATA.getBytes(StandardCharsets.UTF_8), attrs, MESSAGE_TIME, RECORD_ID);
     int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
     assertEquals(1, n);
   }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java
index 6b920e8..2b698f0 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java
@@ -20,9 +20,8 @@ package org.apache.beam.sdk.io.gcp.pubsub;
 import static org.junit.Assert.assertEquals;
 
 import com.google.api.client.util.Clock;
-import com.google.protobuf.ByteString;
-import com.google.pubsub.v1.PubsubMessage;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
@@ -55,8 +54,9 @@ public class PubsubTestClientTest {
     final AtomicLong now = new AtomicLong();
     Clock clock = now::get;
     IncomingMessage expectedIncomingMessage =
-        IncomingMessage.of(
-            PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(DATA)).build(),
+        new IncomingMessage(
+            DATA.getBytes(StandardCharsets.UTF_8),
+            null,
             MESSAGE_TIME,
             REQ_TIME,
             ACK_ID,
@@ -75,14 +75,7 @@ public class PubsubTestClientTest {
         client.advance();
         incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
         assertEquals(1, incomingMessages.size());
-        assertEquals(
-            IncomingMessage.of(
-                expectedIncomingMessage.message(),
-                expectedIncomingMessage.timestampMsSinceEpoch(),
-                now.get(),
-                expectedIncomingMessage.ackId(),
-                expectedIncomingMessage.recordId()),
-            incomingMessages.get(0));
+        assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0));
         now.addAndGet(10 * 1000);
         client.advance();
         // Extend ack
@@ -92,14 +85,7 @@ public class PubsubTestClientTest {
         client.advance();
         incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
         assertEquals(1, incomingMessages.size());
-        assertEquals(
-            IncomingMessage.of(
-                expectedIncomingMessage.message(),
-                expectedIncomingMessage.timestampMsSinceEpoch(),
-                now.get(),
-                expectedIncomingMessage.ackId(),
-                expectedIncomingMessage.recordId()),
-            incomingMessages.get(0));
+        assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0));
         // Extend ack
         client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
         // Ack
@@ -113,10 +99,7 @@ public class PubsubTestClientTest {
   @Test
   public void publishOneMessage() throws IOException {
     OutgoingMessage expectedOutgoingMessage =
-        OutgoingMessage.of(
-            PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(DATA)).build(),
-            MESSAGE_TIME,
-            MESSAGE_ID);
+        new OutgoingMessage(DATA.getBytes(StandardCharsets.UTF_8), null, MESSAGE_TIME, MESSAGE_ID);
     try (PubsubTestClientFactory factory =
         PubsubTestClient.createFactoryForPublish(
             TOPIC, Sets.newHashSet(expectedOutgoingMessage), ImmutableList.of())) {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
index f8cd86e..f588e05 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsub;
 
-import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
@@ -84,12 +83,8 @@ public class PubsubUnboundedSinkTest implements Serializable {
   @Test
   public void saneCoder() throws Exception {
     OutgoingMessage message =
-        OutgoingMessage.of(
-            com.google.pubsub.v1.PubsubMessage.newBuilder()
-                .setData(ByteString.copyFromUtf8(DATA))
-                .build(),
-            TIMESTAMP,
-            getRecordId(DATA));
+        new OutgoingMessage(
+            DATA.getBytes(StandardCharsets.UTF_8), ImmutableMap.of(), TIMESTAMP, getRecordId(DATA));
     CoderProperties.coderDecodeEncodeEqual(PubsubUnboundedSink.CODER, message);
     CoderProperties.coderSerializable(PubsubUnboundedSink.CODER);
   }
@@ -98,13 +93,8 @@ public class PubsubUnboundedSinkTest implements Serializable {
   public void sendOneMessage() throws IOException {
     List<OutgoingMessage> outgoing =
         ImmutableList.of(
-            OutgoingMessage.of(
-                com.google.pubsub.v1.PubsubMessage.newBuilder()
-                    .setData(ByteString.copyFromUtf8(DATA))
-                    .putAllAttributes(ATTRIBUTES)
-                    .build(),
-                TIMESTAMP,
-                getRecordId(DATA)));
+            new OutgoingMessage(
+                DATA.getBytes(StandardCharsets.UTF_8), ATTRIBUTES, TIMESTAMP, getRecordId(DATA)));
     int batchSize = 1;
     int batchBytes = 1;
     try (PubsubTestClientFactory factory =
@@ -131,10 +121,9 @@ public class PubsubUnboundedSinkTest implements Serializable {
   public void sendOneMessageWithoutAttributes() throws IOException {
     List<OutgoingMessage> outgoing =
         ImmutableList.of(
-            OutgoingMessage.of(
-                com.google.pubsub.v1.PubsubMessage.newBuilder()
-                    .setData(ByteString.copyFromUtf8(DATA))
-                    .build(),
+            new OutgoingMessage(
+                DATA.getBytes(StandardCharsets.UTF_8),
+                null /* attributes */,
                 TIMESTAMP,
                 getRecordId(DATA)));
     try (PubsubTestClientFactory factory =
@@ -168,10 +157,9 @@ public class PubsubUnboundedSinkTest implements Serializable {
     for (int i = 0; i < batchSize * 10; i++) {
       String str = String.valueOf(i);
       outgoing.add(
-          OutgoingMessage.of(
-              com.google.pubsub.v1.PubsubMessage.newBuilder()
-                  .setData(ByteString.copyFromUtf8(str))
-                  .build(),
+          new OutgoingMessage(
+              str.getBytes(StandardCharsets.UTF_8),
+              ImmutableMap.of(),
               TIMESTAMP,
               getRecordId(str)));
       data.add(str);
@@ -210,10 +198,9 @@ public class PubsubUnboundedSinkTest implements Serializable {
       }
       String str = sb.toString();
       outgoing.add(
-          OutgoingMessage.of(
-              com.google.pubsub.v1.PubsubMessage.newBuilder()
-                  .setData(ByteString.copyFromUtf8(str))
-                  .build(),
+          new OutgoingMessage(
+              str.getBytes(StandardCharsets.UTF_8),
+              ImmutableMap.of(),
               TIMESTAMP,
               getRecordId(str)));
       data.add(str);
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
index 43ecbdc..b2dacf0 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
@@ -31,7 +31,6 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import com.google.api.client.util.Clock;
-import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -101,14 +100,8 @@ public class PubsubUnboundedSourceTest {
   private void setupOneMessage() {
     setupOneMessage(
         ImmutableList.of(
-            IncomingMessage.of(
-                com.google.pubsub.v1.PubsubMessage.newBuilder()
-                    .setData(ByteString.copyFromUtf8(DATA))
-                    .build(),
-                TIMESTAMP,
-                0,
-                ACK_ID,
-                RECORD_ID)));
+            new IncomingMessage(
+                DATA.getBytes(StandardCharsets.UTF_8), null, TIMESTAMP, 0, ACK_ID, RECORD_ID)));
   }
 
   @After
@@ -226,14 +219,8 @@ public class PubsubUnboundedSourceTest {
       String data = String.format("data_%d", i);
       String ackid = String.format("ackid_%d", i);
       incoming.add(
-          IncomingMessage.of(
-              com.google.pubsub.v1.PubsubMessage.newBuilder()
-                  .setData(ByteString.copyFromUtf8(data))
-                  .build(),
-              TIMESTAMP,
-              0,
-              ackid,
-              RECORD_ID));
+          new IncomingMessage(
+              data.getBytes(StandardCharsets.UTF_8), null, TIMESTAMP, 0, ackid, RECORD_ID));
     }
     setupOneMessage(incoming);
     PubsubReader reader = primSource.createReader(p.getOptions(), null);
@@ -292,10 +279,9 @@ public class PubsubUnboundedSourceTest {
       String recid = String.format("recordid_%d", messageNum);
       String ackId = String.format("ackid_%d", messageNum);
       incoming.add(
-          IncomingMessage.of(
-              com.google.pubsub.v1.PubsubMessage.newBuilder()
-                  .setData(ByteString.copyFromUtf8(data))
-                  .build(),
+          new IncomingMessage(
+              data.getBytes(StandardCharsets.UTF_8),
+              null,
               messageNumToTimestamp(messageNum),
               0,
               ackId,