You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2021/10/08 22:07:36 UTC

[beam] 01/01: Revert "[BEAM-12908] Change to use PubsubSignal for information propagation so this works on dataflow"

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch revert-15607-pubsub-signal-psl-it
in repository https://gitbox.apache.org/repos/asf/beam.git

commit cb09eb8f314e44023897ddbf49b0c9df8cb6f7e5
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Fri Oct 8 15:06:18 2021 -0700

    Revert "[BEAM-12908] Change to use PubsubSignal for information propagation so this works on dataflow"
---
 .../beam/sdk/io/gcp/pubsublite/ReadWriteIT.java    | 88 +++++++++++++++-------
 1 file changed, 59 insertions(+), 29 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
index bd15310..80c362a 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.gcp.pubsublite;
 
 import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.junit.Assert.fail;
 
 import com.google.cloud.pubsublite.AdminClient;
 import com.google.cloud.pubsublite.AdminClientSettings;
@@ -35,18 +36,19 @@ import com.google.cloud.pubsublite.proto.Subscription;
 import com.google.cloud.pubsublite.proto.Subscription.DeliveryConfig.DeliveryRequirement;
 import com.google.cloud.pubsublite.proto.Topic;
 import com.google.cloud.pubsublite.proto.Topic.PartitionConfig.Capacity;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
 import com.google.protobuf.ByteString;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Deque;
-import java.util.Set;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -55,11 +57,12 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.FlatMapElements;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.joda.time.Duration;
 import org.junit.After;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -68,12 +71,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @RunWith(JUnit4.class)
+@Ignore("https://issues.apache.org/jira/browse/BEAM-12908")
 public class ReadWriteIT {
   private static final Logger LOG = LoggerFactory.getLogger(ReadWriteIT.class);
   private static final CloudZone ZONE = CloudZone.parse("us-central1-b");
   private static final int MESSAGE_COUNT = 90;
 
-  @Rule public transient TestPubsubSignal signal = TestPubsubSignal.create();
   @Rule public transient TestPipeline pipeline = TestPipeline.create();
 
   private static ProjectId getProject(PipelineOptions options) {
@@ -205,21 +208,28 @@ public class ReadWriteIT {
         "dedupeMessages", PubsubLiteIO.deduplicate(UuidDeduplicationOptions.newBuilder().build()));
   }
 
-  public static SimpleFunction<SequencedMessage, Integer> extractIds() {
-    return new SimpleFunction<SequencedMessage, Integer>() {
-      @Override
-      public Integer apply(SequencedMessage input) {
-        return Integer.parseInt(input.getMessage().getData().toStringUtf8());
-      }
-    };
+  // This static out of band communication is needed to retain serializability.
+  @GuardedBy("ReadWriteIT.class")
+  private static final List<SequencedMessage> received = new ArrayList<>();
+
+  private static synchronized void addMessageReceived(SequencedMessage message) {
+    received.add(message);
   }
 
-  public static SerializableFunction<Set<Integer>, Boolean> testIds() {
-    return ids -> {
-      LOG.info("Ids are: {}", ids);
-      Set<Integer> target = IntStream.range(0, MESSAGE_COUNT).boxed().collect(Collectors.toSet());
-      return target.equals(ids);
-    };
+  private static synchronized List<SequencedMessage> getTestQuickstartReceived() {
+    return ImmutableList.copyOf(received);
+  }
+
+  private static PTransform<PCollection<? extends SequencedMessage>, PCollection<Void>>
+      collectTestQuickstart() {
+    return MapElements.via(
+        new SimpleFunction<SequencedMessage, Void>() {
+          @Override
+          public Void apply(SequencedMessage input) {
+            addMessageReceived(input);
+            return null;
+          }
+        });
   }
 
   @Test
@@ -250,17 +260,37 @@ public class ReadWriteIT {
     // Read some messages. They should be deduplicated by the time we see them, so there should be
     // exactly numMessages, one for every index in [0,MESSAGE_COUNT).
     PCollection<SequencedMessage> messages = readMessages(subscription, pipeline);
-    PCollection<Integer> ids = messages.apply(MapElements.via(extractIds()));
-    ids.apply("PubsubSignalTest", signal.signalSuccessWhen(BigEndianIntegerCoder.of(), testIds()));
-    pipeline.apply(signal.signalStart());
-    PipelineResult job = pipeline.run();
+    messages.apply("messageReceiver", collectTestQuickstart());
+    pipeline.run();
     LOG.info("Running!");
-    signal.waitForSuccess(Duration.standardMinutes(5));
-    // A runner may not support cancel
-    try {
-      job.cancel();
-    } catch (UnsupportedOperationException exc) {
-      // noop
+    for (int round = 0; round < 120; ++round) {
+      Thread.sleep(1000);
+      Map<Integer, Integer> receivedCounts = new HashMap<>();
+      for (SequencedMessage message : getTestQuickstartReceived()) {
+        int id = Integer.parseInt(message.getMessage().getData().toStringUtf8());
+        receivedCounts.put(id, receivedCounts.getOrDefault(id, 0) + 1);
+      }
+      LOG.info("Performing comparison round {}.\n", round);
+      boolean done = true;
+      List<Integer> missing = new ArrayList<>();
+      for (int id = 0; id < MESSAGE_COUNT; id++) {
+        int idCount = receivedCounts.getOrDefault(id, 0);
+        if (idCount == 0) {
+          missing.add(id);
+          done = false;
+        }
+        if (idCount > 1) {
+          fail(String.format("Failed to deduplicate message with id %s.", id));
+        }
+      }
+      LOG.info("Still messing messages: {}.\n", missing);
+      if (done) {
+        return;
+      }
     }
+    fail(
+        String.format(
+            "Failed to receive all messages after 2 minutes. Received %s messages.",
+            getTestQuickstartReceived().size()));
   }
 }