You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by na...@apache.org on 2021/01/29 21:10:18 UTC
[ignite-extensions] branch master updated: IGNITE-14009 Fixes flaky
PubSubStreamerSelfTest. (#38)
This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 7c13d57 IGNITE-14009 Fixes flaky PubSubStreamerSelfTest. (#38)
7c13d57 is described below
commit 7c13d5764a7a588f2220bae4bbb3619c43a14146
Author: Mikhail Petrov <32...@users.noreply.github.com>
AuthorDate: Sat Jan 30 00:10:09 2021 +0300
IGNITE-14009 Fixes flaky PubSubStreamerSelfTest. (#38)
---
.../ignite/stream/pubsub/MockPubSubServer.java | 22 ++++++++++++++++++----
.../stream/pubsub/PubSubStreamerSelfTest.java | 6 +++---
2 files changed, 21 insertions(+), 7 deletions(-)
diff --git a/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java b/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
index 1cbd010..2c6acfc 100644
--- a/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
+++ b/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
@@ -34,9 +34,10 @@ import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import java.util.Queue;
import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
import org.jetbrains.annotations.NotNull;
import org.mockito.Mockito;
@@ -71,11 +72,14 @@ class MockPubSubServer {
/** */
public static final int MESSAGES_PER_REQUEST = 10;
+ /** Time to wait for the message in milliseconds. */
+ private static final long MSG_WAIT_TIMEOUT = 1_000L;
+
/** */
private final Map<String, Publisher> publishers = new HashMap<>();
/** */
- private final Queue<PubsubMessage> blockingQueue = new LinkedBlockingDeque<>();
+ private final BlockingDeque<PubsubMessage> blockingQueue = new LinkedBlockingDeque<>();
public SubscriberStubSettings createSubscriberStub() throws IOException {
CredentialsProvider credentialsProvider = NoCredentialsProvider.create();
@@ -136,8 +140,18 @@ class MockPubSubServer {
private void pullMessages(ClientCall.Listener<PullResponse> listener, Metadata metadata) {
PullResponse.Builder pullResponse = PullResponse.newBuilder();
- for(int i = 0; i < MESSAGES_PER_REQUEST; i++) {
- pullResponse.addReceivedMessages(ReceivedMessage.newBuilder().mergeMessage(blockingQueue.remove()).build());
+ try {
+ for (int i = 0; i < MESSAGES_PER_REQUEST; i++) {
+ PubsubMessage msg = blockingQueue.poll(MSG_WAIT_TIMEOUT, TimeUnit.MILLISECONDS);
+
+ if (msg == null)
+ break;
+
+ pullResponse.addReceivedMessages(ReceivedMessage.newBuilder().mergeMessage(msg).build());
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
listener.onMessage(pullResponse.build());
diff --git a/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/PubSubStreamerSelfTest.java b/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/PubSubStreamerSelfTest.java
index 300efe3..b286b94 100644
--- a/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/PubSubStreamerSelfTest.java
+++ b/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/PubSubStreamerSelfTest.java
@@ -180,9 +180,6 @@ public class PubSubStreamerSelfTest {
pubSubStmr.setSingleTupleExtractor(singleTupleExtractor());
- // Start Pub/Sub streamer.
- pubSubStmr.start();
-
final CountDownLatch latch = new CountDownLatch(CNT);
IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
@@ -210,6 +207,9 @@ public class PubSubStreamerSelfTest {
ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
+ // Start Pub/Sub streamer.
+ pubSubStmr.start();
+
// Checks all events successfully processed in 10 seconds.
assertTrue("Failed to wait latch completion, still wait " + latch.getCount() + " events",
latch.await(10, TimeUnit.SECONDS));