You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/25 06:03:22 UTC

[pulsar] branch master updated: [tests] improve DispatcherBlockConsumerTest#testBrokerSubscriptionRecovery (#2643)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1eb0e67  [tests] improve DispatcherBlockConsumerTest#testBrokerSubscriptionRecovery (#2643)
1eb0e67 is described below

commit 1eb0e670399f9171572dfbeb2c24b87183050bf2
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Mon Sep 24 23:03:17 2018 -0700

    [tests] improve DispatcherBlockConsumerTest#testBrokerSubscriptionRecovery (#2643)
    
    *Motivation*
    
    DispatcherBlockConsumerTest#testBrokerSubscriptionRecovery is flaky.
    
    ```
    Stacktrace
    java.lang.AssertionError: expected [true] but found [false]
    	at org.testng.Assert.fail(Assert.java:96)
    	at org.testng.Assert.failNotEquals(Assert.java:776)
    	at org.testng.Assert.assertTrue(Assert.java:44)
    	at org.testng.Assert.assertTrue(Assert.java:54)
    	at org.apache.pulsar.client.api.DispatcherBlockConsumerTest.testBrokerSubscriptionRecovery(DispatcherBlockConsumerTest.java:651)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
    	at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:54)
    	at org.testng.internal.InvokeMethodRunnable.run(InvokeMethodRunnable.java:44)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    ```
    
    The problem is `Consumer.acknowledge` doesn't guarantee a message is acked, it only guarantees an ack request is sent back to the broker.
    
    *Changes*
    
    Update the validation logic to check `unackMsgs` are redelivered.
---
 .../org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java   | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index d8f0cf7..727ce62 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -647,8 +647,10 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
             }
             receivedMsgs.add(new String(msg.getData()));
         }
-        receivedMsgs.removeAll(unackMsgs);
-        assertTrue(receivedMsgs.isEmpty());
+        // there is no guarantee when a messages is acknowledged when consumer.acknowledge is called.
+        // consumer.acknowledge only guarantees that an ack request is sent to the wire. so we can
+        // only check all unackMsgs will be redelivered.
+        unackMsgs.forEach(msg -> assertTrue(receivedMsgs.contains(msg)));
     }
 
     /**