You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/25 06:03:22 UTC
[pulsar] branch master updated: [tests] improve
DispatcherBlockConsumerTest#testBrokerSubscriptionRecovery (#2643)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1eb0e67 [tests] improve DispatcherBlockConsumerTest#testBrokerSubscriptionRecovery (#2643)
1eb0e67 is described below
commit 1eb0e670399f9171572dfbeb2c24b87183050bf2
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Mon Sep 24 23:03:17 2018 -0700
[tests] improve DispatcherBlockConsumerTest#testBrokerSubscriptionRecovery (#2643)
*Motivation*
DispatcherBlockConsumerTest#testBrokerSubscriptionRecovery is flaky.
```
Stacktrace
java.lang.AssertionError: expected [true] but found [false]
at org.testng.Assert.fail(Assert.java:96)
at org.testng.Assert.failNotEquals(Assert.java:776)
at org.testng.Assert.assertTrue(Assert.java:44)
at org.testng.Assert.assertTrue(Assert.java:54)
at org.apache.pulsar.client.api.DispatcherBlockConsumerTest.testBrokerSubscriptionRecovery(DispatcherBlockConsumerTest.java:651)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:54)
at org.testng.internal.InvokeMethodRunnable.run(InvokeMethodRunnable.java:44)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
The problem is `Consumer.acknowledge` doesn't guarantee a message is acked, it only guarantees an ack request is sent back to the broker.
*Changes*
Update the validation logic to check `unackMsgs` are redelivered.
---
.../org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index d8f0cf7..727ce62 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -647,8 +647,10 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
}
receivedMsgs.add(new String(msg.getData()));
}
- receivedMsgs.removeAll(unackMsgs);
- assertTrue(receivedMsgs.isEmpty());
+ // there is no guarantee when a messages is acknowledged when consumer.acknowledge is called.
+ // consumer.acknowledge only guarantees that an ack request is sent to the wire. so we can
+ // only check all unackMsgs will be redelivered.
+ unackMsgs.forEach(msg -> assertTrue(receivedMsgs.contains(msg)));
}
/**