You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by na...@apache.org on 2021/01/29 21:10:18 UTC

[ignite-extensions] branch master updated: IGNITE-14009 Fixes flaky PubSubStreamerSelfTest. (#38)

This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c13d57  IGNITE-14009 Fixes flaky PubSubStreamerSelfTest. (#38)
7c13d57 is described below

commit 7c13d5764a7a588f2220bae4bbb3619c43a14146
Author: Mikhail Petrov <32...@users.noreply.github.com>
AuthorDate: Sat Jan 30 00:10:09 2021 +0300

    IGNITE-14009 Fixes flaky PubSubStreamerSelfTest. (#38)
---
 .../ignite/stream/pubsub/MockPubSubServer.java     | 22 ++++++++++++++++++----
 .../stream/pubsub/PubSubStreamerSelfTest.java      |  6 +++---
 2 files changed, 21 insertions(+), 7 deletions(-)

diff --git a/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java b/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
index 1cbd010..2c6acfc 100644
--- a/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
+++ b/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
@@ -34,9 +34,10 @@ import com.google.pubsub.v1.ReceivedMessage;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
 
 import org.jetbrains.annotations.NotNull;
 import org.mockito.Mockito;
@@ -71,11 +72,14 @@ class MockPubSubServer {
     /** */
     public static final int MESSAGES_PER_REQUEST = 10;
 
+    /** Time to wait for the message in milliseconds. */
+    private static final long MSG_WAIT_TIMEOUT = 1_000L;
+
     /** */
     private final Map<String, Publisher> publishers = new HashMap<>();
 
     /** */
-    private final Queue<PubsubMessage> blockingQueue = new LinkedBlockingDeque<>();
+    private final BlockingDeque<PubsubMessage> blockingQueue = new LinkedBlockingDeque<>();
 
     public SubscriberStubSettings createSubscriberStub() throws IOException {
         CredentialsProvider credentialsProvider = NoCredentialsProvider.create();
@@ -136,8 +140,18 @@ class MockPubSubServer {
     private void pullMessages(ClientCall.Listener<PullResponse> listener, Metadata metadata) {
         PullResponse.Builder pullResponse = PullResponse.newBuilder();
 
-        for(int i = 0; i < MESSAGES_PER_REQUEST; i++) {
-            pullResponse.addReceivedMessages(ReceivedMessage.newBuilder().mergeMessage(blockingQueue.remove()).build());
+        try {
+            for (int i = 0; i < MESSAGES_PER_REQUEST; i++) {
+                PubsubMessage msg = blockingQueue.poll(MSG_WAIT_TIMEOUT, TimeUnit.MILLISECONDS);
+
+                if (msg == null)
+                    break;
+
+                pullResponse.addReceivedMessages(ReceivedMessage.newBuilder().mergeMessage(msg).build());
+            }
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
         }
 
         listener.onMessage(pullResponse.build());
diff --git a/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/PubSubStreamerSelfTest.java b/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/PubSubStreamerSelfTest.java
index 300efe3..b286b94 100644
--- a/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/PubSubStreamerSelfTest.java
+++ b/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/PubSubStreamerSelfTest.java
@@ -180,9 +180,6 @@ public class PubSubStreamerSelfTest {
 
             pubSubStmr.setSingleTupleExtractor(singleTupleExtractor());
 
-            // Start Pub/Sub streamer.
-            pubSubStmr.start();
-
             final CountDownLatch latch = new CountDownLatch(CNT);
 
             IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
@@ -210,6 +207,9 @@ public class PubSubStreamerSelfTest {
 
             ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
 
+            // Start Pub/Sub streamer.
+            pubSubStmr.start();
+
             // Checks all events successfully processed in 10 seconds.
             assertTrue("Failed to wait latch completion, still wait " + latch.getCount() + " events",
                 latch.await(10, TimeUnit.SECONDS));