You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/05/29 03:20:59 UTC

[pulsar] branch master updated: Fix race in MessageDispatchThrottlingTest (#7073)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0068bd9  Fix race in MessageDispatchThrottlingTest (#7073)
0068bd9 is described below

commit 0068bd97b23cf9b3bec25f225b2dd2e133c5f2a5
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu May 28 20:20:47 2020 -0700

    Fix race in MessageDispatchThrottlingTest (#7073)
    
    There was a race in testRelativeMessageRateLimitingThrottling where
    the test was sending and receiving a message to warmup the broker, and
    then testing whether messages could get through with a 1/s dispatch
    rate but a high produce rate.
    
    The race triggered when the initial warmup message triggered a backoff
    because the initial warmup receive already used up the 1/s permit
    quota. This happened because when the dispatcher has read messages
    from the managed ledger, it will call #readMoreEntries. If this call
    to readMoreEntries occurs before the test starts publishing the
    backoff is hit.
    
    The solution is to change to way we validate the functionallity at the
    end of the test. The test really wants to verify that if there are a
    lot of messages being produced we can keep up. We should only hit the
    backoff once. So instead of calling receive with a 100ms timeout, set
    a max deadline for all the messages to have been delivered by. This
    timeout should take into account the possibility of a backoff
    occurring, so it is set to 1.1 seconds.
    
    Co-authored-by: Ivan Kelly <ik...@splunk.com>
---
 .../client/api/MessageDispatchThrottlingTest.java   | 21 ++++++++++++++++++---
 1 file changed, 18 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index d75fcfd..2373029 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -984,7 +984,7 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
 
     /**
      * It verifies that relative throttling at least dispatch messages as publish-rate.
-     * 
+     *
      * @param subscription
      * @throws Exception
      */
@@ -1041,13 +1041,28 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
         int totalReceived = 0;
         // Relative throttling will let it drain immediately because it allows to dispatch = (publish-rate +
         // dispatch-rate)
+        // All messages should be received in the next 1.1 seconds. 100 millis should be enough for the actual delivery,
+        // while the previous call to receive above may have thrown the dispatcher into a read backoff, as nothing
+        // may have been produced before the call to readNext() and the permits for dispatch had already been used.
+        // The backoff is 1 second, so we expect to be able to receive all messages in at most 1.1 seconds, while the
+        // basic dispatch rate limit would only allow one message in that time.
+        long maxTimeNanos = TimeUnit.MILLISECONDS.toNanos(1100);
+        long startNanos = System.nanoTime();
         for (int i = 0; i < numProducedMessages; i++) {
-            Message<byte[]> msg = consumer.receive();
+            Message<byte[]> msg = consumer.receive((int)maxTimeNanos, TimeUnit.NANOSECONDS);
             totalReceived++;
             assertNotNull(msg);
+            long elapsedNanos = System.nanoTime() - startNanos;
+            if (elapsedNanos > maxTimeNanos) { // fail fast
+                log.info("Test has only received {} messages in {}ms, {} expected",
+                         totalReceived, TimeUnit.NANOSECONDS.toMillis(elapsedNanos), numProducedMessages);
+                Assert.fail("Messages not received in time");
+            }
+            log.info("Received {}-{}", msg.getMessageId(), new String(msg.getData()));
         }
-
         Assert.assertEquals(totalReceived, numProducedMessages);
+        long elapsedNanos = System.nanoTime() - startNanos;
+        Assert.assertTrue(elapsedNanos < maxTimeNanos);
 
         consumer.close();
         producer.close();