You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2020/12/23 18:12:26 UTC

[beam] branch master updated: [BEAM-11159] Use official GCP client in TestPubSubSignal (#13598)

This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c17270b  [BEAM-11159] Use official GCP client in TestPubSubSignal (#13598)
c17270b is described below

commit c17270b848768d68e87df7d48485e3912fcd7c93
Author: Brian Hulette <bh...@google.com>
AuthorDate: Wed Dec 23 10:11:53 2020 -0800

    [BEAM-11159] Use official GCP client in TestPubSubSignal (#13598)
    
    * Update TestPubsubSignal to use GCP client
    
    * minor cleanup in TestPubsub
---
 .../apache/beam/sdk/io/gcp/pubsub/TestPubsub.java  |  17 +--
 .../beam/sdk/io/gcp/pubsub/TestPubsubSignal.java   | 150 ++++++++++++++-------
 2 files changed, 109 insertions(+), 58 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
index 2c32c55..27071cc 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
@@ -32,7 +32,6 @@ import com.google.cloud.pubsub.v1.TopicAdminClient;
 import com.google.cloud.pubsub.v1.TopicAdminSettings;
 import com.google.protobuf.ByteString;
 import com.google.pubsub.v1.PushConfig;
-import com.google.pubsub.v1.Subscription;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
@@ -142,21 +141,23 @@ public class TestPubsub implements TestRule {
             pipelineOptions.getProject(), createTopicName(description, EVENTS_TOPIC_NAME));
     topicAdmin.createTopic(eventsTopicPathTmp.getPath());
 
+    // Set this after successful creation; it signals that the topic needs teardown
     eventsTopicPath = eventsTopicPathTmp;
 
     String subscriptionName =
         topicPath().getName() + "_beam_" + ThreadLocalRandom.current().nextLong();
-    subscriptionPath =
+    SubscriptionPath subscriptionPathTmp =
         new SubscriptionPath(
             String.format(
                 "projects/%s/subscriptions/%s", pipelineOptions.getProject(), subscriptionName));
 
-    Subscription subscription =
-        subscriptionAdmin.createSubscription(
-            subscriptionPath.getPath(),
-            topicPath().getPath(),
-            PushConfig.getDefaultInstance(),
-            DEFAULT_ACK_DEADLINE_SECONDS);
+    subscriptionAdmin.createSubscription(
+        subscriptionPathTmp.getPath(),
+        topicPath().getPath(),
+        PushConfig.getDefaultInstance(),
+        DEFAULT_ACK_DEADLINE_SECONDS);
+
+    subscriptionPath = subscriptionPathTmp;
   }
 
   private void tearDown() throws IOException {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java
index c185914..86f80ef 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java
@@ -17,18 +17,23 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsub;
 
-import static java.util.stream.Collectors.toList;
 import static org.apache.beam.sdk.io.gcp.pubsub.TestPubsub.createTopicName;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
-import io.grpc.Status;
-import io.grpc.StatusRuntimeException;
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import com.google.pubsub.v1.PushConfig;
 import java.io.IOException;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
 import org.apache.beam.sdk.state.BagState;
@@ -55,6 +60,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
+import org.joda.time.Seconds;
 import org.junit.rules.TestRule;
 import org.junit.runner.Description;
 import org.junit.runners.model.Statement;
@@ -76,12 +82,16 @@ public class TestPubsubSignal implements TestRule {
   private static final String RESULT_SUCCESS_MESSAGE = "SUCCESS";
   private static final String START_TOPIC_NAME = "start";
   private static final String START_SIGNAL_MESSAGE = "START SIGNAL";
+  private static final Integer DEFAULT_ACK_DEADLINE_SECONDS = 60;
 
   private static final String NO_ID_ATTRIBUTE = null;
   private static final String NO_TIMESTAMP_ATTRIBUTE = null;
 
-  PubsubClient pubsub;
-  private TestPubsubOptions pipelineOptions;
+  private final TestPubsubOptions pipelineOptions;
+  private final String pubsubEndpoint;
+
+  private @Nullable TopicAdminClient topicAdmin = null;
+  private @Nullable SubscriptionAdminClient subscriptionAdmin = null;
   private @Nullable TopicPath resultTopicPath = null;
   private @Nullable TopicPath startTopicPath = null;
 
@@ -97,6 +107,7 @@ public class TestPubsubSignal implements TestRule {
 
   private TestPubsubSignal(TestPubsubOptions pipelineOptions) {
     this.pipelineOptions = pipelineOptions;
+    this.pubsubEndpoint = PubsubOptions.targetForRootUrl(this.pipelineOptions.getPubsubRootUrl());
   }
 
   @Override
@@ -104,7 +115,7 @@ public class TestPubsubSignal implements TestRule {
     return new Statement() {
       @Override
       public void evaluate() throws Throwable {
-        if (TestPubsubSignal.this.pubsub != null) {
+        if (topicAdmin != null || subscriptionAdmin != null) {
           throw new AssertionError(
               "Pubsub client was not shutdown in previous test. "
                   + "Topic path is'"
@@ -125,9 +136,18 @@ public class TestPubsubSignal implements TestRule {
   }
 
   private void initializePubsub(Description description) throws IOException {
-    pubsub =
-        PubsubGrpcClient.FACTORY.newClient(
-            NO_TIMESTAMP_ATTRIBUTE, NO_ID_ATTRIBUTE, pipelineOptions);
+    topicAdmin =
+        TopicAdminClient.create(
+            TopicAdminSettings.newBuilder()
+                .setCredentialsProvider(pipelineOptions::getGcpCredential)
+                .setEndpoint(pubsubEndpoint)
+                .build());
+    subscriptionAdmin =
+        SubscriptionAdminClient.create(
+            SubscriptionAdminSettings.newBuilder()
+                .setCredentialsProvider(pipelineOptions::getGcpCredential)
+                .setEndpoint(pubsubEndpoint)
+                .build());
 
     // Example topic name:
     //    integ-test-TestClassName-testMethodName-2018-12-11-23-32-333-<random-long>-result
@@ -138,8 +158,8 @@ public class TestPubsubSignal implements TestRule {
         PubsubClient.topicPathFromName(
             pipelineOptions.getProject(), createTopicName(description, START_TOPIC_NAME));
 
-    pubsub.createTopic(resultTopicPathTmp);
-    pubsub.createTopic(startTopicPathTmp);
+    topicAdmin.createTopic(resultTopicPathTmp.getPath());
+    topicAdmin.createTopic(startTopicPathTmp.getPath());
 
     // Set these after successful creation; this signals that they need teardown
     resultTopicPath = resultTopicPathTmp;
@@ -147,21 +167,34 @@ public class TestPubsubSignal implements TestRule {
   }
 
   private void tearDown() throws IOException {
-    if (pubsub == null) {
+    if (subscriptionAdmin == null || topicAdmin == null) {
       return;
     }
 
     try {
       if (resultTopicPath != null) {
-        pubsub.deleteTopic(resultTopicPath);
+        for (String subscriptionPath :
+            topicAdmin.listTopicSubscriptions(resultTopicPath.getPath()).iterateAll()) {
+          subscriptionAdmin.deleteSubscription(subscriptionPath);
+        }
+        topicAdmin.deleteTopic(resultTopicPath.getPath());
       }
       if (startTopicPath != null) {
-        pubsub.deleteTopic(startTopicPath);
+        for (String subscriptionPath :
+            topicAdmin.listTopicSubscriptions(startTopicPath.getPath()).iterateAll()) {
+          subscriptionAdmin.deleteSubscription(subscriptionPath);
+        }
+        topicAdmin.deleteTopic(startTopicPath.getPath());
       }
     } finally {
-      pubsub.close();
-      pubsub = null;
+      subscriptionAdmin.close();
+      topicAdmin.close();
+
+      subscriptionAdmin = null;
+      topicAdmin = null;
+
       resultTopicPath = null;
+      startTopicPath = null;
     }
   }
 
@@ -215,8 +248,11 @@ public class TestPubsubSignal implements TestRule {
             pipelineOptions.getProject(),
             "start-subscription-" + String.valueOf(ThreadLocalRandom.current().nextLong()));
 
-    pubsub.createSubscription(
-        startTopicPath, startSubscriptionPath, (int) duration.getStandardSeconds());
+    subscriptionAdmin.createSubscription(
+        startTopicPath.getPath(),
+        startSubscriptionPath.getPath(),
+        PushConfig.getDefaultInstance(),
+        (int) duration.getStandardSeconds());
 
     return Suppliers.memoize(
         () -> {
@@ -228,8 +264,8 @@ public class TestPubsubSignal implements TestRule {
             throw new RuntimeException(e);
           } finally {
             try {
-              pubsub.deleteSubscription(startSubscriptionPath);
-            } catch (IOException e) {
+              subscriptionAdmin.deleteSubscription(startSubscriptionPath.getPath());
+            } catch (ApiException e) {
               LOG.error(String.format("Leaked PubSub subscription '%s'", startSubscriptionPath));
             }
           }
@@ -243,14 +279,17 @@ public class TestPubsubSignal implements TestRule {
             pipelineOptions.getProject(),
             "result-subscription-" + String.valueOf(ThreadLocalRandom.current().nextLong()));
 
-    pubsub.createSubscription(
-        resultTopicPath, resultSubscriptionPath, (int) duration.getStandardSeconds());
+    subscriptionAdmin.createSubscription(
+        resultSubscriptionPath.getPath(),
+        resultTopicPath.getPath(),
+        PushConfig.getDefaultInstance(),
+        (int) duration.getStandardSeconds());
 
     String result = pollForResultForDuration(resultSubscriptionPath, duration);
 
     try {
-      pubsub.deleteSubscription(resultSubscriptionPath);
-    } catch (IOException e) {
+      subscriptionAdmin.deleteSubscription(resultSubscriptionPath.getPath());
+    } catch (ApiException e) {
       LOG.error(String.format("Leaked PubSub subscription '%s'", resultSubscriptionPath));
     }
 
@@ -260,39 +299,50 @@ public class TestPubsubSignal implements TestRule {
   }
 
   private String pollForResultForDuration(
-      SubscriptionPath signalSubscriptionPath, Duration duration) throws IOException {
+      SubscriptionPath signalSubscriptionPath, Duration timeoutDuration) throws IOException {
 
-    List<PubsubClient.IncomingMessage> signal = null;
-    DateTime endPolling = DateTime.now().plus(duration.getMillis());
+    AtomicReference<String> result = new AtomicReference<>(null);
 
-    do {
+    MessageReceiver receiver =
+        (com.google.pubsub.v1.PubsubMessage message, AckReplyConsumer replyConsumer) -> {
+          // Ignore empty messages
+          if (message.getData().isEmpty()) {
+            replyConsumer.ack();
+          }
+          if (result.compareAndSet(null, message.getData().toStringUtf8())) {
+            replyConsumer.ack();
+          } else {
+            replyConsumer.nack();
+          }
+        };
+
+    Subscriber subscriber =
+        Subscriber.newBuilder(signalSubscriptionPath.getPath(), receiver)
+            .setCredentialsProvider(pipelineOptions::getGcpCredential)
+            .setEndpoint(pubsubEndpoint)
+            .build();
+    subscriber.startAsync();
+
+    DateTime startTime = new DateTime();
+    int timeoutSeconds = timeoutDuration.toStandardSeconds().getSeconds();
+    while (result.get() == null
+        && Seconds.secondsBetween(startTime, new DateTime()).getSeconds() < timeoutSeconds) {
       try {
-        signal = pubsub.pull(DateTime.now().getMillis(), signalSubscriptionPath, 1, false);
-        if (signal.isEmpty()) {
-          continue;
-        }
-        pubsub.acknowledge(
-            signalSubscriptionPath, signal.stream().map(IncomingMessage::ackId).collect(toList()));
-        break;
-      } catch (StatusRuntimeException e) {
-        if (!Status.DEADLINE_EXCEEDED.equals(e.getStatus())) {
-          LOG.warn(
-              "(Will retry) Error while polling {} for signal: {}",
-              signalSubscriptionPath,
-              e.getStatus());
-        }
-        sleep(500);
+        Thread.sleep(1000);
+      } catch (InterruptedException ignored) {
       }
-    } while (DateTime.now().isBefore(endPolling));
+    }
+
+    subscriber.stopAsync();
+    subscriber.awaitTerminated();
 
-    if (signal == null || signal.isEmpty()) {
+    if (result.get() == null) {
       throw new AssertionError(
           String.format(
               "Did not receive signal on %s in %ss",
-              signalSubscriptionPath, duration.getStandardSeconds()));
+              signalSubscriptionPath, timeoutDuration.getStandardSeconds()));
     }
-
-    return signal.get(0).message().getData().toStringUtf8();
+    return result.get();
   }
 
   private void sleep(long t) {