You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2021/10/08 22:07:36 UTC
[beam] 01/01: Revert "[BEAM-12908] Change to use PubsubSignal for
information propagation so this works on dataflow"
This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch revert-15607-pubsub-signal-psl-it
in repository https://gitbox.apache.org/repos/asf/beam.git
commit cb09eb8f314e44023897ddbf49b0c9df8cb6f7e5
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Fri Oct 8 15:06:18 2021 -0700
Revert "[BEAM-12908] Change to use PubsubSignal for information propagation so this works on dataflow"
---
.../beam/sdk/io/gcp/pubsublite/ReadWriteIT.java | 88 +++++++++++++++-------
1 file changed, 59 insertions(+), 29 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
index bd15310..80c362a 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.gcp.pubsublite;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.junit.Assert.fail;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
@@ -35,18 +36,19 @@ import com.google.cloud.pubsublite.proto.Subscription;
import com.google.cloud.pubsublite.proto.Subscription.DeliveryConfig.DeliveryRequirement;
import com.google.cloud.pubsublite.proto.Topic;
import com.google.cloud.pubsublite.proto.Topic.PartitionConfig.Capacity;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.protobuf.ByteString;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Deque;
-import java.util.Set;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -55,11 +57,12 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.junit.After;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -68,12 +71,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(JUnit4.class)
+@Ignore("https://issues.apache.org/jira/browse/BEAM-12908")
public class ReadWriteIT {
private static final Logger LOG = LoggerFactory.getLogger(ReadWriteIT.class);
private static final CloudZone ZONE = CloudZone.parse("us-central1-b");
private static final int MESSAGE_COUNT = 90;
- @Rule public transient TestPubsubSignal signal = TestPubsubSignal.create();
@Rule public transient TestPipeline pipeline = TestPipeline.create();
private static ProjectId getProject(PipelineOptions options) {
@@ -205,21 +208,28 @@ public class ReadWriteIT {
"dedupeMessages", PubsubLiteIO.deduplicate(UuidDeduplicationOptions.newBuilder().build()));
}
- public static SimpleFunction<SequencedMessage, Integer> extractIds() {
- return new SimpleFunction<SequencedMessage, Integer>() {
- @Override
- public Integer apply(SequencedMessage input) {
- return Integer.parseInt(input.getMessage().getData().toStringUtf8());
- }
- };
+ // This static out of band communication is needed to retain serializability.
+ @GuardedBy("ReadWriteIT.class")
+ private static final List<SequencedMessage> received = new ArrayList<>();
+
+ private static synchronized void addMessageReceived(SequencedMessage message) {
+ received.add(message);
}
- public static SerializableFunction<Set<Integer>, Boolean> testIds() {
- return ids -> {
- LOG.info("Ids are: {}", ids);
- Set<Integer> target = IntStream.range(0, MESSAGE_COUNT).boxed().collect(Collectors.toSet());
- return target.equals(ids);
- };
+ private static synchronized List<SequencedMessage> getTestQuickstartReceived() {
+ return ImmutableList.copyOf(received);
+ }
+
+ private static PTransform<PCollection<? extends SequencedMessage>, PCollection<Void>>
+ collectTestQuickstart() {
+ return MapElements.via(
+ new SimpleFunction<SequencedMessage, Void>() {
+ @Override
+ public Void apply(SequencedMessage input) {
+ addMessageReceived(input);
+ return null;
+ }
+ });
}
@Test
@@ -250,17 +260,37 @@ public class ReadWriteIT {
// Read some messages. They should be deduplicated by the time we see them, so there should be
// exactly numMessages, one for every index in [0,MESSAGE_COUNT).
PCollection<SequencedMessage> messages = readMessages(subscription, pipeline);
- PCollection<Integer> ids = messages.apply(MapElements.via(extractIds()));
- ids.apply("PubsubSignalTest", signal.signalSuccessWhen(BigEndianIntegerCoder.of(), testIds()));
- pipeline.apply(signal.signalStart());
- PipelineResult job = pipeline.run();
+ messages.apply("messageReceiver", collectTestQuickstart());
+ pipeline.run();
LOG.info("Running!");
- signal.waitForSuccess(Duration.standardMinutes(5));
- // A runner may not support cancel
- try {
- job.cancel();
- } catch (UnsupportedOperationException exc) {
- // noop
+ for (int round = 0; round < 120; ++round) {
+ Thread.sleep(1000);
+ Map<Integer, Integer> receivedCounts = new HashMap<>();
+ for (SequencedMessage message : getTestQuickstartReceived()) {
+ int id = Integer.parseInt(message.getMessage().getData().toStringUtf8());
+ receivedCounts.put(id, receivedCounts.getOrDefault(id, 0) + 1);
+ }
+ LOG.info("Performing comparison round {}.\n", round);
+ boolean done = true;
+ List<Integer> missing = new ArrayList<>();
+ for (int id = 0; id < MESSAGE_COUNT; id++) {
+ int idCount = receivedCounts.getOrDefault(id, 0);
+ if (idCount == 0) {
+ missing.add(id);
+ done = false;
+ }
+ if (idCount > 1) {
+ fail(String.format("Failed to deduplicate message with id %s.", id));
+ }
+ }
+ LOG.info("Still messing messages: {}.\n", missing);
+ if (done) {
+ return;
+ }
}
+ fail(
+ String.format(
+ "Failed to receive all messages after 2 minutes. Received %s messages.",
+ getTestQuickstartReceived().size()));
}
}