You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/21 07:52:06 UTC
[incubator-pulsar] branch master updated: Fixed race condition in
consumer event listener (#1818)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0355b14 Fixed race condition in consumer event listener (#1818)
0355b14 is described below
commit 0355b1449039bed2a3094a3968d04be8ae67b0ff
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon May 21 00:52:03 2018 -0700
Fixed race condition in consumer event listener (#1818)
### Motivation
Fixes #1614 #1617
The flaky test was actually because of a race condition in the dispatcher code. If there is a pending read when the consumer is switched, the actual switch happens after the read completes, but the dispatcher wouldn't trigger the notification at that point.
### Modifications
Ensure the active/inactive notification is sent after the read completes.
---
.../PersistentDispatcherSingleActiveConsumer.java | 5 +++-
.../broker/service/PersistentFailoverE2ETest.java | 27 ++++++++++++----------
2 files changed, 19 insertions(+), 13 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 7050f1d..9ab7b87 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -38,7 +38,6 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
-import org.apache.pulsar.broker.service.Consumer.SendMessageInfo;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
@@ -199,6 +198,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
entries.forEach(Entry::release);
cursor.rewind();
if (currentConsumer != null) {
+ notifyActiveConsumerChanged(currentConsumer);
readMoreEntries(currentConsumer);
}
} else {
@@ -458,6 +458,9 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Retrying read operation", name, c);
}
+ if (currentConsumer != c) {
+ notifyActiveConsumerChanged(currentConsumer);
+ }
readMoreEntries(currentConsumer);
} else {
log.info("[{}-{}] Skipping read retry: Current Consumer {}, havePendingRead {}", name, c,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index 6c61734..3e293af 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -24,6 +24,9 @@ import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -53,9 +56,6 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
public class PersistentFailoverE2ETest extends BrokerTestBase {
@BeforeClass
@@ -70,7 +70,7 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
super.internalCleanup();
}
- private static final int CONSUMER_ADD_OR_REMOVE_WAIT_TIME = 2000;
+ private static final int CONSUMER_ADD_OR_REMOVE_WAIT_TIME = 100;
private static class TestConsumerStateEventListener implements ConsumerEventListener {
@@ -100,12 +100,16 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
}
private void verifyConsumerActive(TestConsumerStateEventListener listener, int partitionId) throws Exception {
- assertEquals(partitionId, listener.activeQueue.take().intValue());
+ Integer pid = listener.activeQueue.poll(10, TimeUnit.SECONDS);
+ assertNotNull(pid);
+ assertEquals(partitionId, pid.intValue());
assertNull(listener.inActiveQueue.poll());
}
private void verifyConsumerInactive(TestConsumerStateEventListener listener, int partitionId) throws Exception {
- assertEquals(partitionId, listener.inActiveQueue.take().intValue());
+ Integer pid = listener.inActiveQueue.poll(10, TimeUnit.SECONDS);
+ assertNotNull(pid);
+ assertEquals(partitionId, pid.intValue());
assertNull(listener.activeQueue.poll());
}
@@ -141,7 +145,7 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
// 1. two consumers on the same subscription
ConsumerBuilder<byte[]> consumerBulder1 = consumerBuilder.clone().consumerName("1")
- .consumerEventListener(listener1).acknowledgmentGroupTime(0, TimeUnit.SECONDS);
+ .consumerEventListener(listener1);
Consumer<byte[]> consumer1 = consumerBulder1.subscribe();
Consumer<byte[]> consumer2 = consumerBuilder.clone().consumerName("2").consumerEventListener(listener2)
.subscribe();
@@ -177,7 +181,7 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
// 3. consumer1 should have all the messages while consumer2 should have no messages
Message<byte[]> msg = null;
- Assert.assertNull(consumer2.receive(1, TimeUnit.SECONDS));
+ Assert.assertNull(consumer2.receive(100, TimeUnit.MILLISECONDS));
for (int i = 0; i < numMsgs; i++) {
msg = consumer1.receive(1, TimeUnit.SECONDS);
Assert.assertNotNull(msg);
@@ -222,7 +226,7 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
consumer2.acknowledge(msg);
}
- Assert.assertNull(consumer2.receive(1, TimeUnit.SECONDS));
+ Assert.assertNull(consumer2.receive(100, TimeUnit.MILLISECONDS));
rolloverPerIntervalStats();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
@@ -250,7 +254,7 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
consumer1.acknowledge(msg);
}
- Assert.assertNull(consumer1.receive(1, TimeUnit.SECONDS));
+ Assert.assertNull(consumer1.receive(100, TimeUnit.MILLISECONDS));
rolloverPerIntervalStats();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
@@ -277,7 +281,7 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
verifyConsumerInactive(listener3, -1);
- Assert.assertNull(consumer3.receive(1, TimeUnit.SECONDS));
+ Assert.assertNull(consumer3.receive(100, TimeUnit.MILLISECONDS));
for (int i = 5; i < numMsgs; i++) {
msg = consumer1.receive(1, TimeUnit.SECONDS);
Assert.assertNotNull(msg);
@@ -299,7 +303,6 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
// 9. unsubscribe allowed if there is a lone consumer
consumer1.close();
- Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
consumer2.close();
Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
try {
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.