You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/21 07:52:06 UTC

[incubator-pulsar] branch master updated: Fixed race condition in consumer event listener (#1818)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0355b14  Fixed race condition in consumer event listener (#1818)
0355b14 is described below

commit 0355b1449039bed2a3094a3968d04be8ae67b0ff
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon May 21 00:52:03 2018 -0700

    Fixed race condition in consumer event listener (#1818)
    
    ### Motivation
    
    Fixes #1614 #1617
    
    The flaky test was actually because of a race condition in the dispatcher code. If there is a pending read when the consumer is switched, the actual switch happens after the read completes, but the dispatcher wouldn't trigger the notification at that point.
    
    ### Modifications
    
    Ensure the active/inactive notification is sent after the read completes.
---
 .../PersistentDispatcherSingleActiveConsumer.java  |  5 +++-
 .../broker/service/PersistentFailoverE2ETest.java  | 27 ++++++++++++----------
 2 files changed, 19 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 7050f1d..9ab7b87 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -38,7 +38,6 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
-import org.apache.pulsar.broker.service.Consumer.SendMessageInfo;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
@@ -199,6 +198,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
             entries.forEach(Entry::release);
             cursor.rewind();
             if (currentConsumer != null) {
+                notifyActiveConsumerChanged(currentConsumer);
                 readMoreEntries(currentConsumer);
             }
         } else {
@@ -458,6 +458,9 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
                         if (log.isDebugEnabled()) {
                             log.debug("[{}-{}] Retrying read operation", name, c);
                         }
+                        if (currentConsumer != c) {
+                            notifyActiveConsumerChanged(currentConsumer);
+                        }
                         readMoreEntries(currentConsumer);
                     } else {
                         log.info("[{}-{}] Skipping read retry: Current Consumer {}, havePendingRead {}", name, c,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index 6c61734..3e293af 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -24,6 +24,9 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -53,9 +56,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 public class PersistentFailoverE2ETest extends BrokerTestBase {
 
     @BeforeClass
@@ -70,7 +70,7 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
         super.internalCleanup();
     }
 
-    private static final int CONSUMER_ADD_OR_REMOVE_WAIT_TIME = 2000;
+    private static final int CONSUMER_ADD_OR_REMOVE_WAIT_TIME = 100;
 
     private static class TestConsumerStateEventListener implements ConsumerEventListener {
 
@@ -100,12 +100,16 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
     }
 
     private void verifyConsumerActive(TestConsumerStateEventListener listener, int partitionId) throws Exception {
-        assertEquals(partitionId, listener.activeQueue.take().intValue());
+        Integer pid = listener.activeQueue.poll(10, TimeUnit.SECONDS);
+        assertNotNull(pid);
+        assertEquals(partitionId, pid.intValue());
         assertNull(listener.inActiveQueue.poll());
     }
 
     private void verifyConsumerInactive(TestConsumerStateEventListener listener, int partitionId) throws Exception {
-        assertEquals(partitionId, listener.inActiveQueue.take().intValue());
+        Integer pid = listener.inActiveQueue.poll(10, TimeUnit.SECONDS);
+        assertNotNull(pid);
+        assertEquals(partitionId, pid.intValue());
         assertNull(listener.activeQueue.poll());
     }
 
@@ -141,7 +145,7 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
 
         // 1. two consumers on the same subscription
         ConsumerBuilder<byte[]> consumerBulder1 = consumerBuilder.clone().consumerName("1")
-                .consumerEventListener(listener1).acknowledgmentGroupTime(0, TimeUnit.SECONDS);
+                .consumerEventListener(listener1);
         Consumer<byte[]> consumer1 = consumerBulder1.subscribe();
         Consumer<byte[]> consumer2 = consumerBuilder.clone().consumerName("2").consumerEventListener(listener2)
                 .subscribe();
@@ -177,7 +181,7 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
 
         // 3. consumer1 should have all the messages while consumer2 should have no messages
         Message<byte[]> msg = null;
-        Assert.assertNull(consumer2.receive(1, TimeUnit.SECONDS));
+        Assert.assertNull(consumer2.receive(100, TimeUnit.MILLISECONDS));
         for (int i = 0; i < numMsgs; i++) {
             msg = consumer1.receive(1, TimeUnit.SECONDS);
             Assert.assertNotNull(msg);
@@ -222,7 +226,7 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
             Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
             consumer2.acknowledge(msg);
         }
-        Assert.assertNull(consumer2.receive(1, TimeUnit.SECONDS));
+        Assert.assertNull(consumer2.receive(100, TimeUnit.MILLISECONDS));
 
         rolloverPerIntervalStats();
         Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
@@ -250,7 +254,7 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
             Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
             consumer1.acknowledge(msg);
         }
-        Assert.assertNull(consumer1.receive(1, TimeUnit.SECONDS));
+        Assert.assertNull(consumer1.receive(100, TimeUnit.MILLISECONDS));
 
         rolloverPerIntervalStats();
         Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
@@ -277,7 +281,7 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
 
         verifyConsumerInactive(listener3, -1);
 
-        Assert.assertNull(consumer3.receive(1, TimeUnit.SECONDS));
+        Assert.assertNull(consumer3.receive(100, TimeUnit.MILLISECONDS));
         for (int i = 5; i < numMsgs; i++) {
             msg = consumer1.receive(1, TimeUnit.SECONDS);
             Assert.assertNotNull(msg);
@@ -299,7 +303,6 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
 
         // 9. unsubscribe allowed if there is a lone consumer
         consumer1.close();
-        Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
         consumer2.close();
         Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
         try {

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.