You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/07/01 13:28:08 UTC

[pulsar] branch branch-2.7 updated: Make Consumer thread safe and lock-free (#10352)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 2720863  Make Consumer thread safe and lock-free (#10352)
2720863 is described below

commit 272086375a58368cff992baa8b1c9528bcb4e69a
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Thu May 6 07:08:25 2021 +0800

    Make Consumer thread safe and lock-free (#10352)
    
    Lock-free solution for https://github.com/apache/pulsar/pull/10240
    
    (cherry picked from commit def1932033f4e9c99ff25df20887f282a338718e)
---
 pulsar-client/pom.xml                              |  7 ++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 76 +++++++++-------------
 .../pulsar/client/impl/ClientTestFixtures.java     |  1 -
 .../pulsar/client/impl/ConsumerImplTest.java       | 26 +++++++-
 .../apache/pulsar/client/impl/ReaderImplTest.java  | 20 +++++-
 5 files changed, 81 insertions(+), 49 deletions(-)

diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index 5688739..64d7a2d 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -162,6 +162,13 @@
       <version>${skyscreamer.version}</version>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 479ddce..9143aed 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -49,6 +49,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -131,8 +132,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     private final int receiverQueueRefillThreshold;
 
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
     private final UnAckedMessageTracker unAckedMessageTracker;
     private final AcknowledgmentsGroupingTracker acknowledgmentsGroupingTracker;
     private final NegativeAcksTracker negativeAcksTracker;
@@ -191,6 +190,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     private final ConcurrentLongHashMap<OpForAckCallBack> ackRequests;
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
+    private final ExecutorService internalPinnedExecutor;
 
     static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
                                                String topic,
@@ -257,6 +257,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         this.pendingChunckedMessageUuidQueue = new GrowableArrayBlockingQueue<>();
         this.expireTimeOfIncompleteChunkedMessageMillis = conf.getExpireTimeOfIncompleteChunkedMessageMillis();
         this.autoAckOldestChunkedMessageOnQueueFull = conf.isAutoAckOldestChunkedMessageOnQueueFull();
+        this.internalPinnedExecutor = client.getInternalExecutorService();
 
         if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
             stats = new ConsumerStatsRecorderImpl(client, conf, this);
@@ -415,25 +416,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     protected CompletableFuture<Message<T>> internalReceiveAsync() {
         CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
         CompletableFuture<Message<T>> result = cancellationHandler.createFuture();
-        Message<T> message = null;
-        try {
-            lock.writeLock().lock();
-            message = incomingMessages.poll(0, TimeUnit.MILLISECONDS);
+        internalPinnedExecutor.execute(() -> {
+            Message<T> message = incomingMessages.poll();
             if (message == null) {
                 pendingReceives.add(result);
                 cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
             }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            result.completeExceptionally(e);
-        } finally {
-            lock.writeLock().unlock();
-        }
-
-        if (message != null) {
-            messageProcessed(message);
-            result.complete(beforeConsume(message));
-        }
+            if (message != null) {
+                messageProcessed(message);
+                result.complete(beforeConsume(message));
+            }
+        });
 
         return result;
     }
@@ -478,8 +471,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
         CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
         CompletableFuture<Messages<T>> result = cancellationHandler.createFuture();
-        try {
-            lock.writeLock().lock();
+
+        internalPinnedExecutor.execute(() -> {
             if (pendingBatchReceives == null) {
                 pendingBatchReceives = Queues.newConcurrentLinkedQueue();
             }
@@ -501,9 +494,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 pendingBatchReceives.add(opBatchReceive);
                 cancellationHandler.setCancelAction(() -> pendingBatchReceives.remove(opBatchReceive));
             }
-        } finally {
-            lock.writeLock().unlock();
-        }
+        });
         return result;
     }
 
@@ -1111,15 +1102,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     }
 
     private void failPendingReceive() {
-        lock.readLock().lock();
-        try {
+        internalPinnedExecutor.execute(() -> {
             if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
                 failPendingReceives(this.pendingReceives);
                 failPendingBatchReceives(this.pendingBatchReceives);
             }
-        } finally {
-            lock.readLock().unlock();
-        }
+        });
     }
 
     void activeConsumerChanged(boolean isActive) {
@@ -1218,13 +1206,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             uncompressedPayload.release();
             msgMetadata.recycle();
 
-            lock.readLock().lock();
-            try {
-                // Enqueue the message so that it can be retrieved when application calls receive()
-                // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
-                // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
-                if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
-                    possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(), Collections.singletonList(message));
+            // Enqueue the message so that it can be retrieved when application calls receive()
+            // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
+            // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
+            internalPinnedExecutor.execute(() -> {
+                if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
+                        redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
+                    possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
+                            Collections.singletonList(message));
                 }
                 if (peekPendingReceive() != null) {
                     notifyPendingReceivedCallback(message, null);
@@ -1233,9 +1222,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                         notifyPendingBatchReceivedCallBack();
                     }
                 }
-            } finally {
-                lock.readLock().unlock();
-            }
+            });
         } else {
             // handle batch message enqueuing; uncompressed payload has all messages in batch
             receiveIndividualMessagesFromBatch(msgMetadata, redeliveryCount, ackSet, uncompressedPayload, messageId, cnx);
@@ -1243,7 +1230,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             uncompressedPayload.release();
             msgMetadata.recycle();
         }
+        internalPinnedExecutor.execute(()
+                -> tryTriggerListener());
 
+    }
+
+    private void tryTriggerListener() {
         if (listener != null) {
             triggerListener();
         }
@@ -1449,8 +1441,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 if (possibleToDeadLetter != null) {
                     possibleToDeadLetter.add(message);
                 }
-                lock.readLock().lock();
-                try {
+                internalPinnedExecutor.execute(() -> {
                     if (peekPendingReceive() != null) {
                         notifyPendingReceivedCallback(message, null);
                     } else if (enqueueMessageAndCheckBatchReceive(message)) {
@@ -1458,11 +1449,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                             notifyPendingBatchReceivedCallBack();
                         }
                     }
-                } finally {
-                    lock.readLock().unlock();
-                }
-                singleMessagePayload.release();
-                singleMessageMetadataBuilder.recycle();
+                    singleMessagePayload.release();
+                });
             }
             if (ackBitSet != null) {
                 ackBitSet.recycle();
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
index 8bb7bbc..0adb165 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.client.impl;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoop;
 import io.netty.util.Timer;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.mockito.Mockito;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index dec7c2a..c7aad50 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -25,9 +25,12 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
@@ -38,21 +41,26 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 public class ConsumerImplTest {
 
 
-    private final ExecutorProvider executorProvider = new ExecutorProvider(1, new DefaultThreadFactory("ConsumerImplTest"));
+    private ExecutorProvider executorProvider = new ExecutorProvider(1, new DefaultThreadFactory("ConsumerImplTest"));
     private ConsumerImpl<byte[]> consumer;
     private ConsumerConfigurationData consumerConf;
+    private ExecutorService executorService;
 
     @BeforeMethod
     public void setUp() {
         consumerConf = new ConsumerConfigurationData<>();
         PulsarClientImpl client = ClientTestFixtures.createPulsarClientMock();
+        executorService = Executors.newSingleThreadExecutor();
+        when(client.getInternalExecutorService()).thenReturn(executorService);
         ClientConfigurationData clientConf = client.getConfiguration();
         clientConf.setOperationTimeoutMs(100);
         clientConf.setStatsIntervalSeconds(0);
@@ -66,6 +74,18 @@ public class ConsumerImplTest {
         consumer.setState(HandlerState.State.Ready);
     }
 
+    @AfterMethod(alwaysRun = true)
+    public void cleanup() {
+        if (executorProvider != null) {
+            executorProvider.shutdownNow();
+            executorProvider = null;
+        }
+        if (executorService != null) {
+            executorService.shutdownNow();
+            executorService = null;
+        }
+    }
+
     @Test(invocationTimeOut = 1000)
     public void testNotifyPendingReceivedCallback_EmptyQueueNotThrowsException() {
         consumer.notifyPendingReceivedCallback(null, null);
@@ -152,7 +172,7 @@ public class ConsumerImplTest {
     public void testReceiveAsyncCanBeCancelled() {
         // given
         CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
-        Assert.assertEquals(consumer.peekPendingReceive(), future);
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(consumer.peekPendingReceive(), future));
         // when
         future.cancel(true);
         // then
@@ -163,7 +183,7 @@ public class ConsumerImplTest {
     public void testBatchReceiveAsyncCanBeCancelled() {
         // given
         CompletableFuture<Messages<byte[]>> future = consumer.batchReceiveAsync();
-        Assert.assertTrue(consumer.hasPendingBatchReceive());
+        Awaitility.await().untilAsserted(() -> Assert.assertTrue(consumer.hasPendingBatchReceive()));
         // when
         future.cancel(true);
         // then
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
index 6e587f7..d0c4023 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
@@ -22,32 +22,50 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 
 public class ReaderImplTest {
     ReaderImpl<byte[]> reader;
+    private ExecutorService executorService;
 
     @BeforeMethod
     void setupReader() {
         PulsarClientImpl mockedClient = ClientTestFixtures.createPulsarClientMockWithMockedClientCnx();
         ReaderConfigurationData<byte[]> readerConfiguration = new ReaderConfigurationData<>();
         readerConfiguration.setTopicName("topicName");
+        executorService = Executors.newSingleThreadExecutor();
+        when(mockedClient.getInternalExecutorService()).thenReturn(executorService);
         CompletableFuture<Consumer<byte[]>> consumerFuture = new CompletableFuture<>();
         reader = new ReaderImpl<>(mockedClient, readerConfiguration, ClientTestFixtures.createMockedExecutorProvider(),
                 consumerFuture, Schema.BYTES);
     }
 
+    @AfterMethod
+    public void clean() {
+        if (executorService != null) {
+            executorService.shutdownNow();
+            executorService = null;
+        }
+    }
+
     @Test
     void shouldSupportCancellingReadNextAsync() {
         // given
         CompletableFuture<Message<byte[]>> future = reader.readNextAsync();
-        assertNotNull(reader.getConsumer().peekPendingReceive());
+        Awaitility.await().untilAsserted(() -> {
+            assertNotNull(reader.getConsumer().peekPendingReceive());
+        });
 
         // when
         future.cancel(false);