You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2019/12/27 18:39:41 UTC
[beam] 01/01: Revert "[BEAM-8932] Modify PubsubClient to use the
proto message throughout."
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch revert-10331-master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit aeae2f417629c374cb025faa3a664a9646859a01
Author: Boyuan Zhang <36...@users.noreply.github.com>
AuthorDate: Fri Dec 27 10:39:26 2019 -0800
Revert "[BEAM-8932] Modify PubsubClient to use the proto message throughout."
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 6 +-
.../beam/sdk/io/gcp/pubsub/PubsubClient.java | 139 ++++++++++++++++-----
.../beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java | 29 +++--
.../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 9 +-
.../beam/sdk/io/gcp/pubsub/PubsubJsonClient.java | 28 ++---
.../beam/sdk/io/gcp/pubsub/PubsubTestClient.java | 11 +-
.../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 23 ++--
.../sdk/io/gcp/pubsub/PubsubUnboundedSource.java | 33 +++--
.../apache/beam/sdk/io/gcp/pubsub/TestPubsub.java | 20 +--
.../beam/sdk/io/gcp/pubsub/TestPubsubSignal.java | 6 +-
.../apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java | 1 -
.../sdk/io/gcp/pubsub/PubsubGrpcClientTest.java | 19 ++-
.../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 8 +-
.../sdk/io/gcp/pubsub/PubsubJsonClientTest.java | 46 ++-----
.../sdk/io/gcp/pubsub/PubsubTestClientTest.java | 31 ++---
.../sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java | 39 ++----
.../io/gcp/pubsub/PubsubUnboundedSourceTest.java | 28 ++---
17 files changed, 226 insertions(+), 250 deletions(-)
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 7bdf719..f3ba610 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -365,7 +365,7 @@ class BeamModulePlugin implements Plugin<Project> {
def cassandra_driver_version = "3.6.0"
def classgraph_version = "4.8.56"
def generated_grpc_beta_version = "0.44.0"
- def generated_grpc_ga_version = "1.83.0"
+ def generated_grpc_ga_version = "1.43.0"
def generated_grpc_dc_beta_version = "0.27.0-alpha"
def google_auth_version = "0.12.0"
def google_clients_version = "1.28.0"
@@ -384,7 +384,7 @@ class BeamModulePlugin implements Plugin<Project> {
def postgres_version = "42.2.2"
def powermock_version = "2.0.2"
def proto_google_common_protos_version = "1.17.0"
- def protobuf_version = "3.11.0"
+ def protobuf_version = "3.6.0"
def quickcheck_version = "0.8"
def spark_version = "2.4.4"
def spark_structured_streaming_version = "2.4.0"
@@ -445,7 +445,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_api_services_clouddebugger : "com.google.apis:google-api-services-clouddebugger:v2-rev20181114-$google_clients_version",
google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20181015-$google_clients_version",
google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20190927-$google_clients_version",
- google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20191111-$google_clients_version",
+ google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20181213-$google_clients_version",
google_api_services_storage : "com.google.apis:google-api-services-storage:v1-rev20181109-$google_clients_version",
google_auth_library_credentials : "com.google.auth:google-auth-library-credentials:$google_auth_version",
google_auth_library_oauth2_http : "com.google.auth:google-auth-library-oauth2-http:$google_auth_version",
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
index 6f0f54d..07d6da6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
@@ -21,12 +21,10 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import com.google.api.client.util.DateTime;
-import com.google.auto.value.AutoValue;
-import com.google.protobuf.ByteString;
-import com.google.pubsub.v1.PubsubMessage;
import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@@ -300,37 +298,59 @@ public abstract class PubsubClient implements Closeable {
* <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. Java
* serialization is never used for non-test clients.
*/
- @AutoValue
- public abstract static class OutgoingMessage implements Serializable {
+ public static class OutgoingMessage implements Serializable {
+ /** Underlying (encoded) element. */
+ public final byte[] elementBytes;
- /** Underlying Message. May not have publish timestamp set. */
- public abstract PubsubMessage message();
+ public final Map<String, String> attributes;
/** Timestamp for element (ms since epoch). */
- public abstract long timestampMsSinceEpoch();
+ public final long timestampMsSinceEpoch;
/**
* If using an id attribute, the record id to associate with this record's metadata so the
* receiver can reject duplicates. Otherwise {@literal null}.
*/
- @Nullable
- public abstract String recordId();
+ @Nullable public final String recordId;
- public static OutgoingMessage of(
- PubsubMessage message, long timestampMsSinceEpoch, @Nullable String recordId) {
- return new AutoValue_PubsubClient_OutgoingMessage(message, timestampMsSinceEpoch, recordId);
- }
-
- public static OutgoingMessage of(
- org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage message,
+ public OutgoingMessage(
+ byte[] elementBytes,
+ Map<String, String> attributes,
long timestampMsSinceEpoch,
@Nullable String recordId) {
- PubsubMessage.Builder builder =
- PubsubMessage.newBuilder().setData(ByteString.copyFrom(message.getPayload()));
- if (message.getAttributeMap() != null) {
- builder.putAllAttributes(message.getAttributeMap());
+ this.elementBytes = elementBytes;
+ this.attributes = attributes;
+ this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+ this.recordId = recordId;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "OutgoingMessage(%db, %dms)", elementBytes.length, timestampMsSinceEpoch);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
}
- return of(builder.build(), timestampMsSinceEpoch, recordId);
+
+ OutgoingMessage that = (OutgoingMessage) o;
+
+ return timestampMsSinceEpoch == that.timestampMsSinceEpoch
+ && Arrays.equals(elementBytes, that.elementBytes)
+ && Objects.equal(attributes, that.attributes)
+ && Objects.equal(recordId, that.recordId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(
+ Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch, recordId);
}
}
@@ -340,35 +360,86 @@ public abstract class PubsubClient implements Closeable {
* <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. Java
* serialization is never used for non-test clients.
*/
- @AutoValue
- abstract static class IncomingMessage implements Serializable {
+ static class IncomingMessage implements Serializable {
+ /** Underlying (encoded) element. */
+ public final byte[] elementBytes;
- /** Underlying Message. */
- public abstract PubsubMessage message();
+ public Map<String, String> attributes;
/**
* Timestamp for element (ms since epoch). Either Pubsub's processing time, or the custom
* timestamp associated with the message.
*/
- public abstract long timestampMsSinceEpoch();
+ public final long timestampMsSinceEpoch;
/** Timestamp (in system time) at which we requested the message (ms since epoch). */
- public abstract long requestTimeMsSinceEpoch();
+ public final long requestTimeMsSinceEpoch;
/** Id to pass back to Pubsub to acknowledge receipt of this message. */
- public abstract String ackId();
+ public final String ackId;
/** Id to pass to the runner to distinguish this message from all others. */
- public abstract String recordId();
+ public final String recordId;
- public static IncomingMessage of(
- PubsubMessage message,
+ public IncomingMessage(
+ byte[] elementBytes,
+ Map<String, String> attributes,
long timestampMsSinceEpoch,
long requestTimeMsSinceEpoch,
String ackId,
String recordId) {
- return new AutoValue_PubsubClient_IncomingMessage(
- message, timestampMsSinceEpoch, requestTimeMsSinceEpoch, ackId, recordId);
+ this.elementBytes = elementBytes;
+ this.attributes = attributes;
+ this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+ this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
+ this.ackId = ackId;
+ this.recordId = recordId;
+ }
+
+ public IncomingMessage withRequestTime(long requestTimeMsSinceEpoch) {
+ return new IncomingMessage(
+ elementBytes,
+ attributes,
+ timestampMsSinceEpoch,
+ requestTimeMsSinceEpoch,
+ ackId,
+ recordId);
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "IncomingMessage(%db, %dms)", elementBytes.length, timestampMsSinceEpoch);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ IncomingMessage that = (IncomingMessage) o;
+
+ return timestampMsSinceEpoch == that.timestampMsSinceEpoch
+ && requestTimeMsSinceEpoch == that.requestTimeMsSinceEpoch
+ && ackId.equals(that.ackId)
+ && recordId.equals(that.recordId)
+ && Arrays.equals(elementBytes, that.elementBytes)
+ && Objects.equal(attributes, that.attributes);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(
+ Arrays.hashCode(elementBytes),
+ attributes,
+ timestampMsSinceEpoch,
+ requestTimeMsSinceEpoch,
+ ackId,
+ recordId);
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
index a3b6b8d..ae3fa02 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.pubsub;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import com.google.auth.Credentials;
+import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.DeleteSubscriptionRequest;
@@ -212,15 +213,21 @@ public class PubsubGrpcClient extends PubsubClient {
public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException {
PublishRequest.Builder request = PublishRequest.newBuilder().setTopic(topic.getPath());
for (OutgoingMessage outgoingMessage : outgoingMessages) {
- PubsubMessage.Builder message = outgoingMessage.message().toBuilder();
+ PubsubMessage.Builder message =
+ PubsubMessage.newBuilder().setData(ByteString.copyFrom(outgoingMessage.elementBytes));
+
+ if (outgoingMessage.attributes != null) {
+ message.putAllAttributes(outgoingMessage.attributes);
+ }
if (timestampAttribute != null) {
- message.putAttributes(
- timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch()));
+ message
+ .getMutableAttributes()
+ .put(timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
}
- if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId())) {
- message.putAttributes(idAttribute, outgoingMessage.recordId());
+ if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+ message.getMutableAttributes().put(idAttribute, outgoingMessage.recordId);
}
request.addMessages(message);
@@ -252,6 +259,9 @@ public class PubsubGrpcClient extends PubsubClient {
PubsubMessage pubsubMessage = message.getMessage();
@Nullable Map<String, String> attributes = pubsubMessage.getAttributes();
+ // Payload.
+ byte[] elementBytes = pubsubMessage.getData().toByteArray();
+
// Timestamp.
String pubsubTimestampString = null;
Timestamp timestampProto = pubsubMessage.getPublishTime();
@@ -277,8 +287,13 @@ public class PubsubGrpcClient extends PubsubClient {
}
incomingMessages.add(
- IncomingMessage.of(
- pubsubMessage, timestampMsSinceEpoch, requestTimeMsSinceEpoch, ackId, recordId));
+ new IncomingMessage(
+ elementBytes,
+ attributes,
+ timestampMsSinceEpoch,
+ requestTimeMsSinceEpoch,
+ ackId,
+ recordId));
}
return incomingMessages;
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 5f6d044..da5266f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -1303,14 +1303,7 @@ public class PubsubIO {
}
// NOTE: The record id is always null.
- output.add(
- OutgoingMessage.of(
- com.google.pubsub.v1.PubsubMessage.newBuilder()
- .setData(ByteString.copyFrom(payload))
- .putAllAttributes(attributes)
- .build(),
- c.timestamp().getMillis(),
- null));
+ output.add(new OutgoingMessage(payload, attributes, c.timestamp().getMillis(), null));
currentOutputBytes += payload.length;
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
index 7f3c771..136b1d2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
@@ -39,7 +39,6 @@ import com.google.api.services.pubsub.model.Topic;
import com.google.auth.Credentials;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
-import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -124,12 +123,8 @@ public class PubsubJsonClient extends PubsubClient {
public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException {
List<PubsubMessage> pubsubMessages = new ArrayList<>(outgoingMessages.size());
for (OutgoingMessage outgoingMessage : outgoingMessages) {
- PubsubMessage pubsubMessage =
- new PubsubMessage().encodeData(outgoingMessage.message().getData().toByteArray());
+ PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes);
pubsubMessage.setAttributes(getMessageAttributes(outgoingMessage));
- if (!outgoingMessage.message().getOrderingKey().isEmpty()) {
- pubsubMessage.put("orderingKey", outgoingMessage.message().getOrderingKey());
- }
pubsubMessages.add(pubsubMessage);
}
PublishRequest request = new PublishRequest().setMessages(pubsubMessages);
@@ -140,16 +135,16 @@ public class PubsubJsonClient extends PubsubClient {
private Map<String, String> getMessageAttributes(OutgoingMessage outgoingMessage) {
Map<String, String> attributes = null;
- if (outgoingMessage.message().getAttributesMap() == null) {
+ if (outgoingMessage.attributes == null) {
attributes = new TreeMap<>();
} else {
- attributes = new TreeMap<>(outgoingMessage.message().getAttributesMap());
+ attributes = new TreeMap<>(outgoingMessage.attributes);
}
if (timestampAttribute != null) {
- attributes.put(timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch()));
+ attributes.put(timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
}
- if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId())) {
- attributes.put(idAttribute, outgoingMessage.recordId());
+ if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+ attributes.put(idAttribute, outgoingMessage.recordId);
}
return attributes;
}
@@ -197,15 +192,10 @@ public class PubsubJsonClient extends PubsubClient {
recordId = pubsubMessage.getMessageId();
}
- com.google.pubsub.v1.PubsubMessage.Builder protoMessage =
- com.google.pubsub.v1.PubsubMessage.newBuilder();
- protoMessage.setData(ByteString.copyFrom(elementBytes));
- protoMessage.putAllAttributes(attributes);
- protoMessage.setOrderingKey(
- (String) pubsubMessage.getUnknownKeys().getOrDefault("orderingKey", ""));
incomingMessages.add(
- IncomingMessage.of(
- protoMessage.build(),
+ new IncomingMessage(
+ elementBytes,
+ attributes,
timestampMsSinceEpoch,
requestTimeMsSinceEpoch,
ackId,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
index c3b915d..6b20b56 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
@@ -309,17 +309,12 @@ public class PubsubTestClient extends PubsubClient implements Serializable {
IncomingMessage incomingMessage = pendItr.next();
pendItr.remove();
IncomingMessage incomingMessageWithRequestTime =
- IncomingMessage.of(
- incomingMessage.message(),
- incomingMessage.timestampMsSinceEpoch(),
- requestTimeMsSinceEpoch,
- incomingMessage.ackId(),
- incomingMessage.recordId());
+ incomingMessage.withRequestTime(requestTimeMsSinceEpoch);
incomingMessages.add(incomingMessageWithRequestTime);
STATE.pendingAckIncomingMessages.put(
- incomingMessageWithRequestTime.ackId(), incomingMessageWithRequestTime);
+ incomingMessageWithRequestTime.ackId, incomingMessageWithRequestTime);
STATE.ackDeadline.put(
- incomingMessageWithRequestTime.ackId(),
+ incomingMessageWithRequestTime.ackId,
requestTimeMsSinceEpoch + STATE.ackTimeoutSec * 1000);
if (incomingMessages.size() >= batchSize) {
break;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index 8be8c56..1258d0b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
@@ -37,7 +38,6 @@ import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
@@ -101,18 +101,19 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
@Override
public void encode(OutgoingMessage value, OutputStream outStream)
throws CoderException, IOException {
- ProtoCoder.of(com.google.pubsub.v1.PubsubMessage.class).encode(value.message(), outStream);
- BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch(), outStream);
- RECORD_ID_CODER.encode(value.recordId(), outStream);
+ ByteArrayCoder.of().encode(value.elementBytes, outStream);
+ ATTRIBUTES_CODER.encode(value.attributes, outStream);
+ BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream);
+ RECORD_ID_CODER.encode(value.recordId, outStream);
}
@Override
public OutgoingMessage decode(InputStream inStream) throws CoderException, IOException {
- com.google.pubsub.v1.PubsubMessage message =
- ProtoCoder.of(com.google.pubsub.v1.PubsubMessage.class).decode(inStream);
+ byte[] elementBytes = ByteArrayCoder.of().decode(inStream);
+ Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream);
long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream);
@Nullable String recordId = RECORD_ID_CODER.decode(inStream);
- return OutgoingMessage.of(message, timestampMsSinceEpoch, recordId);
+ return new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch, recordId);
}
}
@@ -153,6 +154,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
elementCounter.inc();
PubsubMessage message = c.element();
byte[] elementBytes = message.getPayload();
+ Map<String, String> attributes = message.getAttributeMap();
long timestampMsSinceEpoch = c.timestamp().getMillis();
@Nullable String recordId = null;
@@ -173,7 +175,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
c.output(
KV.of(
ThreadLocalRandom.current().nextInt(numShards),
- OutgoingMessage.of(message, timestampMsSinceEpoch, recordId)));
+ new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch, recordId)));
}
@Override
@@ -244,8 +246,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
List<OutgoingMessage> pubsubMessages = new ArrayList<>(publishBatchSize);
int bytes = 0;
for (OutgoingMessage message : c.element().getValue()) {
- if (!pubsubMessages.isEmpty()
- && bytes + message.message().getData().size() > publishBatchBytes) {
+ if (!pubsubMessages.isEmpty() && bytes + message.elementBytes.length > publishBatchBytes) {
// Break large (in bytes) batches into smaller.
// (We've already broken by batch size using the trigger below, though that may
// run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider that ok since
@@ -256,7 +257,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
bytes = 0;
}
pubsubMessages.add(message);
- bytes += message.message().getData().size();
+ bytes += message.elementBytes.length;
}
if (!pubsubMessages.isEmpty()) {
// BLOCKS until published.
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index 230161c..d8abfe1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -727,18 +727,18 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
// Capture the received messages.
for (PubsubClient.IncomingMessage incomingMessage : receivedMessages) {
notYetRead.add(incomingMessage);
- notYetReadBytes += incomingMessage.message().getData().size();
+ notYetReadBytes += incomingMessage.elementBytes.length;
inFlight.put(
- incomingMessage.ackId(),
+ incomingMessage.ackId,
new InFlightState(requestTimeMsSinceEpoch, deadlineMsSinceEpoch));
numReceived++;
numReceivedRecently.add(requestTimeMsSinceEpoch, 1L);
minReceivedTimestampMsSinceEpoch.add(
- requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch());
+ requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch);
maxReceivedTimestampMsSinceEpoch.add(
- requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch());
+ requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch);
minUnreadTimestampMsSinceEpoch.add(
- requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch());
+ requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch);
}
}
@@ -837,7 +837,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
if (current != null) {
// Current is consumed. It can no longer contribute to holding back the watermark.
- minUnreadTimestampMsSinceEpoch.remove(current.requestTimeMsSinceEpoch());
+ minUnreadTimestampMsSinceEpoch.remove(current.requestTimeMsSinceEpoch);
current = null;
}
@@ -864,18 +864,18 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
// Try again later.
return false;
}
- notYetReadBytes -= current.message().getData().size();
+ notYetReadBytes -= current.elementBytes.length;
checkState(notYetReadBytes >= 0);
long nowMsSinceEpoch = now();
- numReadBytes.add(nowMsSinceEpoch, current.message().getData().size());
- minReadTimestampMsSinceEpoch.add(nowMsSinceEpoch, current.timestampMsSinceEpoch());
- if (current.timestampMsSinceEpoch() < lastWatermarkMsSinceEpoch) {
+ numReadBytes.add(nowMsSinceEpoch, current.elementBytes.length);
+ minReadTimestampMsSinceEpoch.add(nowMsSinceEpoch, current.timestampMsSinceEpoch);
+ if (current.timestampMsSinceEpoch < lastWatermarkMsSinceEpoch) {
numLateMessages.add(nowMsSinceEpoch, 1L);
}
// Current message can be considered 'read' and will be persisted by the next
// checkpoint. So it is now safe to ACK back to Pubsub.
- safeToAckIds.add(current.ackId());
+ safeToAckIds.add(current.ackId);
return true;
}
@@ -884,10 +884,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
if (current == null) {
throw new NoSuchElementException();
}
- return new PubsubMessage(
- current.message().getData().toByteArray(),
- current.message().getAttributesMap(),
- current.recordId());
+ return new PubsubMessage(current.elementBytes, current.attributes, current.recordId);
}
@Override
@@ -895,7 +892,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
if (current == null) {
throw new NoSuchElementException();
}
- return new Instant(current.timestampMsSinceEpoch());
+ return new Instant(current.timestampMsSinceEpoch);
}
@Override
@@ -903,7 +900,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
if (current == null) {
throw new NoSuchElementException();
}
- return current.recordId().getBytes(StandardCharsets.UTF_8);
+ return current.recordId.getBytes(StandardCharsets.UTF_8);
}
/**
@@ -987,7 +984,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
List<String> snapshotSafeToAckIds = Lists.newArrayList(safeToAckIds);
List<String> snapshotNotYetReadIds = new ArrayList<>(notYetRead.size());
for (PubsubClient.IncomingMessage incomingMessage : notYetRead) {
- snapshotNotYetReadIds.add(incomingMessage.ackId());
+ snapshotNotYetReadIds.add(incomingMessage.ackId);
}
if (outer.subscriptionPath == null) {
// need to include the subscription in case we resume, as it's not stored in the source.
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
index e1e8711..1e75d43 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
@@ -22,14 +22,12 @@ import static org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.projectPathFromPath
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
-import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
@@ -207,16 +205,11 @@ public class TestPubsub implements TestRule {
if (!messages.isEmpty()) {
pubsub.acknowledge(
subscriptionPath,
- messages.stream().map(IncomingMessage::ackId).collect(ImmutableList.toImmutableList()));
+ messages.stream().map(msg -> msg.ackId).collect(ImmutableList.toImmutableList()));
}
return messages.stream()
- .map(
- msg ->
- new PubsubMessage(
- msg.message().getData().toByteArray(),
- msg.message().getAttributesMap(),
- msg.recordId()))
+ .map(msg -> new PubsubMessage(msg.elementBytes, msg.attributes, msg.recordId))
.collect(ImmutableList.toImmutableList());
}
@@ -299,12 +292,7 @@ public class TestPubsub implements TestRule {
}
private PubsubClient.OutgoingMessage toOutgoingMessage(PubsubMessage message) {
- return PubsubClient.OutgoingMessage.of(
- com.google.pubsub.v1.PubsubMessage.newBuilder()
- .setData(ByteString.copyFrom(message.getPayload()))
- .putAllAttributes(message.getAttributeMap())
- .build(),
- DateTime.now().getMillis(),
- null);
+ return new PubsubClient.OutgoingMessage(
+ message.getPayload(), message.getAttributeMap(), DateTime.now().getMillis(), null);
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java
index de4a715..f4f8b18 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.gcp.pubsub;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.stream.Collectors.toList;
import static org.apache.beam.sdk.io.gcp.pubsub.TestPubsub.createTopicName;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
@@ -29,7 +30,6 @@ import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.state.BagState;
@@ -251,7 +251,7 @@ public class TestPubsubSignal implements TestRule {
try {
signal = pubsub.pull(DateTime.now().getMillis(), signalSubscriptionPath, 1, false);
pubsub.acknowledge(
- signalSubscriptionPath, signal.stream().map(IncomingMessage::ackId).collect(toList()));
+ signalSubscriptionPath, signal.stream().map(m -> m.ackId).collect(toList()));
break;
} catch (StatusRuntimeException e) {
if (!Status.DEADLINE_EXCEEDED.equals(e.getStatus())) {
@@ -271,7 +271,7 @@ public class TestPubsubSignal implements TestRule {
signalSubscriptionPath, duration.getStandardSeconds()));
}
- return signal.get(0).message().getData().toStringUtf8();
+ return new String(signal.get(0).elementBytes, UTF_8);
}
private void sleep(long t) {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
index bc146ce..50f6548 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
@@ -67,7 +67,6 @@ public class GcpApiSurfaceTest {
classesInPackage("com.google.cloud.bigtable.config"),
classesInPackage("com.google.cloud.bigtable.data"),
classesInPackage("com.google.spanner.v1"),
- classesInPackage("com.google.pubsub.v1"),
Matchers.equalTo(com.google.api.gax.rpc.ApiException.class),
Matchers.<Class<?>>equalTo(com.google.api.gax.longrunning.OperationFuture.class),
Matchers.<Class<?>>equalTo(com.google.api.gax.longrunning.OperationSnapshot.class),
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
index 4dd719b..7c53170 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
@@ -142,11 +142,11 @@ public class PubsubGrpcClientTest {
List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
assertEquals(1, acutalMessages.size());
IncomingMessage actualMessage = acutalMessages.get(0);
- assertEquals(ACK_ID, actualMessage.ackId());
- assertEquals(DATA, actualMessage.message().getData().toStringUtf8());
- assertEquals(RECORD_ID, actualMessage.recordId());
- assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch());
- assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch());
+ assertEquals(ACK_ID, actualMessage.ackId);
+ assertEquals(DATA, new String(actualMessage.elementBytes, StandardCharsets.UTF_8));
+ assertEquals(RECORD_ID, actualMessage.recordId);
+ assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
+ assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
assertEquals(expectedRequest, Iterables.getOnlyElement(requestsReceived));
} finally {
server.shutdownNow();
@@ -187,13 +187,8 @@ public class PubsubGrpcClientTest {
InProcessServerBuilder.forName(channelName).addService(publisherImplBase).build().start();
try {
OutgoingMessage actualMessage =
- OutgoingMessage.of(
- com.google.pubsub.v1.PubsubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8(DATA))
- .putAllAttributes(ATTRIBUTES)
- .build(),
- MESSAGE_TIME,
- RECORD_ID);
+ new OutgoingMessage(
+ DATA.getBytes(StandardCharsets.UTF_8), ATTRIBUTES, MESSAGE_TIME, RECORD_ID);
int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
assertEquals(1, n);
assertEquals(expectedRequest, Iterables.getOnlyElement(requestsReceived));
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 0dc910f..65b89a7 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -29,7 +29,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import com.google.api.client.util.Clock;
-import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
@@ -392,10 +391,9 @@ public class PubsubIOTest {
})
.map(
ba ->
- IncomingMessage.of(
- com.google.pubsub.v1.PubsubMessage.newBuilder()
- .setData(ByteString.copyFrom(ba))
- .build(),
+ new IncomingMessage(
+ ba,
+ null,
1234L,
0,
UUID.randomUUID().toString(),
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
index 434f0a3..f7fc0f3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
@@ -34,7 +34,6 @@ import com.google.api.services.pubsub.model.PullResponse;
import com.google.api.services.pubsub.model.ReceivedMessage;
import com.google.api.services.pubsub.model.Subscription;
import com.google.api.services.pubsub.model.Topic;
-import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
@@ -74,7 +73,6 @@ public class PubsubJsonClientTest {
private static final String DATA = "testData";
private static final String RECORD_ID = "testRecordId";
private static final String ACK_ID = "testAckId";
- private static final String ORDERING_KEY = "testOrderingKey";
@Before
public void setup() {
@@ -100,8 +98,7 @@ public class PubsubJsonClientTest {
.setPublishTime(String.valueOf(PUB_TIME))
.setAttributes(
ImmutableMap.of(
- TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME), ID_ATTRIBUTE, RECORD_ID))
- .set("orderingKey", ORDERING_KEY);
+ TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME), ID_ATTRIBUTE, RECORD_ID));
ReceivedMessage expectedReceivedMessage =
new ReceivedMessage().setMessage(expectedPubsubMessage).setAckId(ACK_ID);
PullResponse expectedResponse =
@@ -116,12 +113,11 @@ public class PubsubJsonClientTest {
List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
assertEquals(1, acutalMessages.size());
IncomingMessage actualMessage = acutalMessages.get(0);
- assertEquals(ACK_ID, actualMessage.ackId());
- assertEquals(DATA, actualMessage.message().getData().toStringUtf8());
- assertEquals(RECORD_ID, actualMessage.recordId());
- assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch());
- assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch());
- assertEquals(ORDERING_KEY, actualMessage.message().getOrderingKey());
+ assertEquals(ACK_ID, actualMessage.ackId);
+ assertEquals(DATA, new String(actualMessage.elementBytes, StandardCharsets.UTF_8));
+ assertEquals(RECORD_ID, actualMessage.recordId);
+ assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
+ assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
}
@Test
@@ -150,7 +146,7 @@ public class PubsubJsonClientTest {
List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
assertEquals(1, acutalMessages.size());
IncomingMessage actualMessage = acutalMessages.get(0);
- assertArrayEquals(new byte[0], actualMessage.message().getData().toByteArray());
+ assertArrayEquals(new byte[0], actualMessage.elementBytes);
}
@Test
@@ -164,8 +160,7 @@ public class PubsubJsonClientTest {
.put(TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME))
.put(ID_ATTRIBUTE, RECORD_ID)
.put("k", "v")
- .build())
- .set("orderingKey", ORDERING_KEY);
+ .build());
PublishRequest expectedRequest =
new PublishRequest().setMessages(ImmutableList.of(expectedPubsubMessage));
PublishResponse expectedResponse =
@@ -176,14 +171,7 @@ public class PubsubJsonClientTest {
Map<String, String> attrs = new HashMap<>();
attrs.put("k", "v");
OutgoingMessage actualMessage =
- OutgoingMessage.of(
- com.google.pubsub.v1.PubsubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8(DATA))
- .putAllAttributes(attrs)
- .setOrderingKey(ORDERING_KEY)
- .build(),
- MESSAGE_TIME,
- RECORD_ID);
+ new OutgoingMessage(DATA.getBytes(StandardCharsets.UTF_8), attrs, MESSAGE_TIME, RECORD_ID);
int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
assertEquals(1, n);
}
@@ -207,12 +195,8 @@ public class PubsubJsonClientTest {
(mockPubsub.projects().topics().publish(expectedTopic, expectedRequest).execute()))
.thenReturn(expectedResponse);
OutgoingMessage actualMessage =
- OutgoingMessage.of(
- com.google.pubsub.v1.PubsubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8(DATA))
- .build(),
- MESSAGE_TIME,
- RECORD_ID);
+ new OutgoingMessage(
+ DATA.getBytes(StandardCharsets.UTF_8), ImmutableMap.of(), MESSAGE_TIME, RECORD_ID);
int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
assertEquals(1, n);
}
@@ -238,13 +222,7 @@ public class PubsubJsonClientTest {
Map<String, String> attrs = new HashMap<>();
attrs.put("k", "v");
OutgoingMessage actualMessage =
- OutgoingMessage.of(
- com.google.pubsub.v1.PubsubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8(DATA))
- .putAllAttributes(attrs)
- .build(),
- MESSAGE_TIME,
- RECORD_ID);
+ new OutgoingMessage(DATA.getBytes(StandardCharsets.UTF_8), attrs, MESSAGE_TIME, RECORD_ID);
int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
assertEquals(1, n);
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java
index 6b920e8..2b698f0 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java
@@ -20,9 +20,8 @@ package org.apache.beam.sdk.io.gcp.pubsub;
import static org.junit.Assert.assertEquals;
import com.google.api.client.util.Clock;
-import com.google.protobuf.ByteString;
-import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
@@ -55,8 +54,9 @@ public class PubsubTestClientTest {
final AtomicLong now = new AtomicLong();
Clock clock = now::get;
IncomingMessage expectedIncomingMessage =
- IncomingMessage.of(
- PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(DATA)).build(),
+ new IncomingMessage(
+ DATA.getBytes(StandardCharsets.UTF_8),
+ null,
MESSAGE_TIME,
REQ_TIME,
ACK_ID,
@@ -75,14 +75,7 @@ public class PubsubTestClientTest {
client.advance();
incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
assertEquals(1, incomingMessages.size());
- assertEquals(
- IncomingMessage.of(
- expectedIncomingMessage.message(),
- expectedIncomingMessage.timestampMsSinceEpoch(),
- now.get(),
- expectedIncomingMessage.ackId(),
- expectedIncomingMessage.recordId()),
- incomingMessages.get(0));
+ assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0));
now.addAndGet(10 * 1000);
client.advance();
// Extend ack
@@ -92,14 +85,7 @@ public class PubsubTestClientTest {
client.advance();
incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
assertEquals(1, incomingMessages.size());
- assertEquals(
- IncomingMessage.of(
- expectedIncomingMessage.message(),
- expectedIncomingMessage.timestampMsSinceEpoch(),
- now.get(),
- expectedIncomingMessage.ackId(),
- expectedIncomingMessage.recordId()),
- incomingMessages.get(0));
+ assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0));
// Extend ack
client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
// Ack
@@ -113,10 +99,7 @@ public class PubsubTestClientTest {
@Test
public void publishOneMessage() throws IOException {
OutgoingMessage expectedOutgoingMessage =
- OutgoingMessage.of(
- PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(DATA)).build(),
- MESSAGE_TIME,
- MESSAGE_ID);
+ new OutgoingMessage(DATA.getBytes(StandardCharsets.UTF_8), null, MESSAGE_TIME, MESSAGE_ID);
try (PubsubTestClientFactory factory =
PubsubTestClient.createFactoryForPublish(
TOPIC, Sets.newHashSet(expectedOutgoingMessage), ImmutableList.of())) {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
index f8cd86e..f588e05 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io.gcp.pubsub;
-import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
@@ -84,12 +83,8 @@ public class PubsubUnboundedSinkTest implements Serializable {
@Test
public void saneCoder() throws Exception {
OutgoingMessage message =
- OutgoingMessage.of(
- com.google.pubsub.v1.PubsubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8(DATA))
- .build(),
- TIMESTAMP,
- getRecordId(DATA));
+ new OutgoingMessage(
+ DATA.getBytes(StandardCharsets.UTF_8), ImmutableMap.of(), TIMESTAMP, getRecordId(DATA));
CoderProperties.coderDecodeEncodeEqual(PubsubUnboundedSink.CODER, message);
CoderProperties.coderSerializable(PubsubUnboundedSink.CODER);
}
@@ -98,13 +93,8 @@ public class PubsubUnboundedSinkTest implements Serializable {
public void sendOneMessage() throws IOException {
List<OutgoingMessage> outgoing =
ImmutableList.of(
- OutgoingMessage.of(
- com.google.pubsub.v1.PubsubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8(DATA))
- .putAllAttributes(ATTRIBUTES)
- .build(),
- TIMESTAMP,
- getRecordId(DATA)));
+ new OutgoingMessage(
+ DATA.getBytes(StandardCharsets.UTF_8), ATTRIBUTES, TIMESTAMP, getRecordId(DATA)));
int batchSize = 1;
int batchBytes = 1;
try (PubsubTestClientFactory factory =
@@ -131,10 +121,9 @@ public class PubsubUnboundedSinkTest implements Serializable {
public void sendOneMessageWithoutAttributes() throws IOException {
List<OutgoingMessage> outgoing =
ImmutableList.of(
- OutgoingMessage.of(
- com.google.pubsub.v1.PubsubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8(DATA))
- .build(),
+ new OutgoingMessage(
+ DATA.getBytes(StandardCharsets.UTF_8),
+ null /* attributes */,
TIMESTAMP,
getRecordId(DATA)));
try (PubsubTestClientFactory factory =
@@ -168,10 +157,9 @@ public class PubsubUnboundedSinkTest implements Serializable {
for (int i = 0; i < batchSize * 10; i++) {
String str = String.valueOf(i);
outgoing.add(
- OutgoingMessage.of(
- com.google.pubsub.v1.PubsubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8(str))
- .build(),
+ new OutgoingMessage(
+ str.getBytes(StandardCharsets.UTF_8),
+ ImmutableMap.of(),
TIMESTAMP,
getRecordId(str)));
data.add(str);
@@ -210,10 +198,9 @@ public class PubsubUnboundedSinkTest implements Serializable {
}
String str = sb.toString();
outgoing.add(
- OutgoingMessage.of(
- com.google.pubsub.v1.PubsubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8(str))
- .build(),
+ new OutgoingMessage(
+ str.getBytes(StandardCharsets.UTF_8),
+ ImmutableMap.of(),
TIMESTAMP,
getRecordId(str)));
data.add(str);
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
index 43ecbdc..b2dacf0 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
@@ -31,7 +31,6 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import com.google.api.client.util.Clock;
-import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -101,14 +100,8 @@ public class PubsubUnboundedSourceTest {
private void setupOneMessage() {
setupOneMessage(
ImmutableList.of(
- IncomingMessage.of(
- com.google.pubsub.v1.PubsubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8(DATA))
- .build(),
- TIMESTAMP,
- 0,
- ACK_ID,
- RECORD_ID)));
+ new IncomingMessage(
+ DATA.getBytes(StandardCharsets.UTF_8), null, TIMESTAMP, 0, ACK_ID, RECORD_ID)));
}
@After
@@ -226,14 +219,8 @@ public class PubsubUnboundedSourceTest {
String data = String.format("data_%d", i);
String ackid = String.format("ackid_%d", i);
incoming.add(
- IncomingMessage.of(
- com.google.pubsub.v1.PubsubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8(data))
- .build(),
- TIMESTAMP,
- 0,
- ackid,
- RECORD_ID));
+ new IncomingMessage(
+ data.getBytes(StandardCharsets.UTF_8), null, TIMESTAMP, 0, ackid, RECORD_ID));
}
setupOneMessage(incoming);
PubsubReader reader = primSource.createReader(p.getOptions(), null);
@@ -292,10 +279,9 @@ public class PubsubUnboundedSourceTest {
String recid = String.format("recordid_%d", messageNum);
String ackId = String.format("ackid_%d", messageNum);
incoming.add(
- IncomingMessage.of(
- com.google.pubsub.v1.PubsubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8(data))
- .build(),
+ new IncomingMessage(
+ data.getBytes(StandardCharsets.UTF_8),
+ null,
messageNumToTimestamp(messageNum),
0,
ackId,