You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2020/12/23 18:12:26 UTC
[beam] branch master updated: [BEAM-11159] Use official GCP client
in TestPubSubSignal (#13598)
This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c17270b [BEAM-11159] Use official GCP client in TestPubSubSignal (#13598)
c17270b is described below
commit c17270b848768d68e87df7d48485e3912fcd7c93
Author: Brian Hulette <bh...@google.com>
AuthorDate: Wed Dec 23 10:11:53 2020 -0800
[BEAM-11159] Use official GCP client in TestPubSubSignal (#13598)
* Update TestPubsubSignal to use GCP client
* minor cleanup in TestPubsub
---
.../apache/beam/sdk/io/gcp/pubsub/TestPubsub.java | 17 +--
.../beam/sdk/io/gcp/pubsub/TestPubsubSignal.java | 150 ++++++++++++++-------
2 files changed, 109 insertions(+), 58 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
index 2c32c55..27071cc 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
@@ -32,7 +32,6 @@ import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PushConfig;
-import com.google.pubsub.v1.Subscription;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@@ -142,21 +141,23 @@ public class TestPubsub implements TestRule {
pipelineOptions.getProject(), createTopicName(description, EVENTS_TOPIC_NAME));
topicAdmin.createTopic(eventsTopicPathTmp.getPath());
+ // Set this after successful creation; it signals that the topic needs teardown
eventsTopicPath = eventsTopicPathTmp;
String subscriptionName =
topicPath().getName() + "_beam_" + ThreadLocalRandom.current().nextLong();
- subscriptionPath =
+ SubscriptionPath subscriptionPathTmp =
new SubscriptionPath(
String.format(
"projects/%s/subscriptions/%s", pipelineOptions.getProject(), subscriptionName));
- Subscription subscription =
- subscriptionAdmin.createSubscription(
- subscriptionPath.getPath(),
- topicPath().getPath(),
- PushConfig.getDefaultInstance(),
- DEFAULT_ACK_DEADLINE_SECONDS);
+ subscriptionAdmin.createSubscription(
+ subscriptionPathTmp.getPath(),
+ topicPath().getPath(),
+ PushConfig.getDefaultInstance(),
+ DEFAULT_ACK_DEADLINE_SECONDS);
+
+ subscriptionPath = subscriptionPathTmp;
}
private void tearDown() throws IOException {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java
index c185914..86f80ef 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java
@@ -17,18 +17,23 @@
*/
package org.apache.beam.sdk.io.gcp.pubsub;
-import static java.util.stream.Collectors.toList;
import static org.apache.beam.sdk.io.gcp.pubsub.TestPubsub.createTopicName;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
-import io.grpc.Status;
-import io.grpc.StatusRuntimeException;
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import com.google.pubsub.v1.PushConfig;
import java.io.IOException;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.state.BagState;
@@ -55,6 +60,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
import org.joda.time.Duration;
+import org.joda.time.Seconds;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
@@ -76,12 +82,16 @@ public class TestPubsubSignal implements TestRule {
private static final String RESULT_SUCCESS_MESSAGE = "SUCCESS";
private static final String START_TOPIC_NAME = "start";
private static final String START_SIGNAL_MESSAGE = "START SIGNAL";
+ private static final Integer DEFAULT_ACK_DEADLINE_SECONDS = 60;
private static final String NO_ID_ATTRIBUTE = null;
private static final String NO_TIMESTAMP_ATTRIBUTE = null;
- PubsubClient pubsub;
- private TestPubsubOptions pipelineOptions;
+ private final TestPubsubOptions pipelineOptions;
+ private final String pubsubEndpoint;
+
+ private @Nullable TopicAdminClient topicAdmin = null;
+ private @Nullable SubscriptionAdminClient subscriptionAdmin = null;
private @Nullable TopicPath resultTopicPath = null;
private @Nullable TopicPath startTopicPath = null;
@@ -97,6 +107,7 @@ public class TestPubsubSignal implements TestRule {
private TestPubsubSignal(TestPubsubOptions pipelineOptions) {
this.pipelineOptions = pipelineOptions;
+ this.pubsubEndpoint = PubsubOptions.targetForRootUrl(this.pipelineOptions.getPubsubRootUrl());
}
@Override
@@ -104,7 +115,7 @@ public class TestPubsubSignal implements TestRule {
return new Statement() {
@Override
public void evaluate() throws Throwable {
- if (TestPubsubSignal.this.pubsub != null) {
+ if (topicAdmin != null || subscriptionAdmin != null) {
throw new AssertionError(
"Pubsub client was not shutdown in previous test. "
+ "Topic path is'"
@@ -125,9 +136,18 @@ public class TestPubsubSignal implements TestRule {
}
private void initializePubsub(Description description) throws IOException {
- pubsub =
- PubsubGrpcClient.FACTORY.newClient(
- NO_TIMESTAMP_ATTRIBUTE, NO_ID_ATTRIBUTE, pipelineOptions);
+ topicAdmin =
+ TopicAdminClient.create(
+ TopicAdminSettings.newBuilder()
+ .setCredentialsProvider(pipelineOptions::getGcpCredential)
+ .setEndpoint(pubsubEndpoint)
+ .build());
+ subscriptionAdmin =
+ SubscriptionAdminClient.create(
+ SubscriptionAdminSettings.newBuilder()
+ .setCredentialsProvider(pipelineOptions::getGcpCredential)
+ .setEndpoint(pubsubEndpoint)
+ .build());
// Example topic name:
// integ-test-TestClassName-testMethodName-2018-12-11-23-32-333-<random-long>-result
@@ -138,8 +158,8 @@ public class TestPubsubSignal implements TestRule {
PubsubClient.topicPathFromName(
pipelineOptions.getProject(), createTopicName(description, START_TOPIC_NAME));
- pubsub.createTopic(resultTopicPathTmp);
- pubsub.createTopic(startTopicPathTmp);
+ topicAdmin.createTopic(resultTopicPathTmp.getPath());
+ topicAdmin.createTopic(startTopicPathTmp.getPath());
// Set these after successful creation; this signals that they need teardown
resultTopicPath = resultTopicPathTmp;
@@ -147,21 +167,34 @@ public class TestPubsubSignal implements TestRule {
}
private void tearDown() throws IOException {
- if (pubsub == null) {
+ if (subscriptionAdmin == null || topicAdmin == null) {
return;
}
try {
if (resultTopicPath != null) {
- pubsub.deleteTopic(resultTopicPath);
+ for (String subscriptionPath :
+ topicAdmin.listTopicSubscriptions(resultTopicPath.getPath()).iterateAll()) {
+ subscriptionAdmin.deleteSubscription(subscriptionPath);
+ }
+ topicAdmin.deleteTopic(resultTopicPath.getPath());
}
if (startTopicPath != null) {
- pubsub.deleteTopic(startTopicPath);
+ for (String subscriptionPath :
+ topicAdmin.listTopicSubscriptions(startTopicPath.getPath()).iterateAll()) {
+ subscriptionAdmin.deleteSubscription(subscriptionPath);
+ }
+ topicAdmin.deleteTopic(startTopicPath.getPath());
}
} finally {
- pubsub.close();
- pubsub = null;
+ subscriptionAdmin.close();
+ topicAdmin.close();
+
+ subscriptionAdmin = null;
+ topicAdmin = null;
+
resultTopicPath = null;
+ startTopicPath = null;
}
}
@@ -215,8 +248,11 @@ public class TestPubsubSignal implements TestRule {
pipelineOptions.getProject(),
"start-subscription-" + String.valueOf(ThreadLocalRandom.current().nextLong()));
- pubsub.createSubscription(
- startTopicPath, startSubscriptionPath, (int) duration.getStandardSeconds());
+ subscriptionAdmin.createSubscription(
+ startTopicPath.getPath(),
+ startSubscriptionPath.getPath(),
+ PushConfig.getDefaultInstance(),
+ (int) duration.getStandardSeconds());
return Suppliers.memoize(
() -> {
@@ -228,8 +264,8 @@ public class TestPubsubSignal implements TestRule {
throw new RuntimeException(e);
} finally {
try {
- pubsub.deleteSubscription(startSubscriptionPath);
- } catch (IOException e) {
+ subscriptionAdmin.deleteSubscription(startSubscriptionPath.getPath());
+ } catch (ApiException e) {
LOG.error(String.format("Leaked PubSub subscription '%s'", startSubscriptionPath));
}
}
@@ -243,14 +279,17 @@ public class TestPubsubSignal implements TestRule {
pipelineOptions.getProject(),
"result-subscription-" + String.valueOf(ThreadLocalRandom.current().nextLong()));
- pubsub.createSubscription(
- resultTopicPath, resultSubscriptionPath, (int) duration.getStandardSeconds());
+ subscriptionAdmin.createSubscription(
+ resultSubscriptionPath.getPath(),
+ resultTopicPath.getPath(),
+ PushConfig.getDefaultInstance(),
+ (int) duration.getStandardSeconds());
String result = pollForResultForDuration(resultSubscriptionPath, duration);
try {
- pubsub.deleteSubscription(resultSubscriptionPath);
- } catch (IOException e) {
+ subscriptionAdmin.deleteSubscription(resultSubscriptionPath.getPath());
+ } catch (ApiException e) {
LOG.error(String.format("Leaked PubSub subscription '%s'", resultSubscriptionPath));
}
@@ -260,39 +299,50 @@ public class TestPubsubSignal implements TestRule {
}
private String pollForResultForDuration(
- SubscriptionPath signalSubscriptionPath, Duration duration) throws IOException {
+ SubscriptionPath signalSubscriptionPath, Duration timeoutDuration) throws IOException {
- List<PubsubClient.IncomingMessage> signal = null;
- DateTime endPolling = DateTime.now().plus(duration.getMillis());
+ AtomicReference<String> result = new AtomicReference<>(null);
- do {
+ MessageReceiver receiver =
+ (com.google.pubsub.v1.PubsubMessage message, AckReplyConsumer replyConsumer) -> {
+ // Ignore empty messages
+ if (message.getData().isEmpty()) {
+ replyConsumer.ack();
+ }
+ if (result.compareAndSet(null, message.getData().toStringUtf8())) {
+ replyConsumer.ack();
+ } else {
+ replyConsumer.nack();
+ }
+ };
+
+ Subscriber subscriber =
+ Subscriber.newBuilder(signalSubscriptionPath.getPath(), receiver)
+ .setCredentialsProvider(pipelineOptions::getGcpCredential)
+ .setEndpoint(pubsubEndpoint)
+ .build();
+ subscriber.startAsync();
+
+ DateTime startTime = new DateTime();
+ int timeoutSeconds = timeoutDuration.toStandardSeconds().getSeconds();
+ while (result.get() == null
+ && Seconds.secondsBetween(startTime, new DateTime()).getSeconds() < timeoutSeconds) {
try {
- signal = pubsub.pull(DateTime.now().getMillis(), signalSubscriptionPath, 1, false);
- if (signal.isEmpty()) {
- continue;
- }
- pubsub.acknowledge(
- signalSubscriptionPath, signal.stream().map(IncomingMessage::ackId).collect(toList()));
- break;
- } catch (StatusRuntimeException e) {
- if (!Status.DEADLINE_EXCEEDED.equals(e.getStatus())) {
- LOG.warn(
- "(Will retry) Error while polling {} for signal: {}",
- signalSubscriptionPath,
- e.getStatus());
- }
- sleep(500);
+ Thread.sleep(1000);
+ } catch (InterruptedException ignored) {
}
- } while (DateTime.now().isBefore(endPolling));
+ }
+
+ subscriber.stopAsync();
+ subscriber.awaitTerminated();
- if (signal == null || signal.isEmpty()) {
+ if (result.get() == null) {
throw new AssertionError(
String.format(
"Did not receive signal on %s in %ss",
- signalSubscriptionPath, duration.getStandardSeconds()));
+ signalSubscriptionPath, timeoutDuration.getStandardSeconds()));
}
-
- return signal.get(0).message().getData().toStringUtf8();
+ return result.get();
}
private void sleep(long t) {