You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/07/01 13:28:08 UTC
[pulsar] branch branch-2.7 updated: Make Consumer thread safe and
lock-free (#10352)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 2720863 Make Consumer thread safe and lock-free (#10352)
2720863 is described below
commit 272086375a58368cff992baa8b1c9528bcb4e69a
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Thu May 6 07:08:25 2021 +0800
Make Consumer thread safe and lock-free (#10352)
Lock-free solution for https://github.com/apache/pulsar/pull/10240
(cherry picked from commit def1932033f4e9c99ff25df20887f282a338718e)
---
pulsar-client/pom.xml | 7 ++
.../apache/pulsar/client/impl/ConsumerImpl.java | 76 +++++++++-------------
.../pulsar/client/impl/ClientTestFixtures.java | 1 -
.../pulsar/client/impl/ConsumerImplTest.java | 26 +++++++-
.../apache/pulsar/client/impl/ReaderImplTest.java | 20 +++++-
5 files changed, 81 insertions(+), 49 deletions(-)
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index 5688739..64d7a2d 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -162,6 +162,13 @@
<version>${skyscreamer.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 479ddce..9143aed 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -49,6 +49,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -131,8 +132,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private final int receiverQueueRefillThreshold;
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
private final UnAckedMessageTracker unAckedMessageTracker;
private final AcknowledgmentsGroupingTracker acknowledgmentsGroupingTracker;
private final NegativeAcksTracker negativeAcksTracker;
@@ -191,6 +190,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private final ConcurrentLongHashMap<OpForAckCallBack> ackRequests;
private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
+ private final ExecutorService internalPinnedExecutor;
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
String topic,
@@ -257,6 +257,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
this.pendingChunckedMessageUuidQueue = new GrowableArrayBlockingQueue<>();
this.expireTimeOfIncompleteChunkedMessageMillis = conf.getExpireTimeOfIncompleteChunkedMessageMillis();
this.autoAckOldestChunkedMessageOnQueueFull = conf.isAutoAckOldestChunkedMessageOnQueueFull();
+ this.internalPinnedExecutor = client.getInternalExecutorService();
if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
stats = new ConsumerStatsRecorderImpl(client, conf, this);
@@ -415,25 +416,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
protected CompletableFuture<Message<T>> internalReceiveAsync() {
CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
CompletableFuture<Message<T>> result = cancellationHandler.createFuture();
- Message<T> message = null;
- try {
- lock.writeLock().lock();
- message = incomingMessages.poll(0, TimeUnit.MILLISECONDS);
+ internalPinnedExecutor.execute(() -> {
+ Message<T> message = incomingMessages.poll();
if (message == null) {
pendingReceives.add(result);
cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- result.completeExceptionally(e);
- } finally {
- lock.writeLock().unlock();
- }
-
- if (message != null) {
- messageProcessed(message);
- result.complete(beforeConsume(message));
- }
+ if (message != null) {
+ messageProcessed(message);
+ result.complete(beforeConsume(message));
+ }
+ });
return result;
}
@@ -478,8 +471,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
CompletableFuture<Messages<T>> result = cancellationHandler.createFuture();
- try {
- lock.writeLock().lock();
+
+ internalPinnedExecutor.execute(() -> {
if (pendingBatchReceives == null) {
pendingBatchReceives = Queues.newConcurrentLinkedQueue();
}
@@ -501,9 +494,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
pendingBatchReceives.add(opBatchReceive);
cancellationHandler.setCancelAction(() -> pendingBatchReceives.remove(opBatchReceive));
}
- } finally {
- lock.writeLock().unlock();
- }
+ });
return result;
}
@@ -1111,15 +1102,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
private void failPendingReceive() {
- lock.readLock().lock();
- try {
+ internalPinnedExecutor.execute(() -> {
if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
failPendingReceives(this.pendingReceives);
failPendingBatchReceives(this.pendingBatchReceives);
}
- } finally {
- lock.readLock().unlock();
- }
+ });
}
void activeConsumerChanged(boolean isActive) {
@@ -1218,13 +1206,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
uncompressedPayload.release();
msgMetadata.recycle();
- lock.readLock().lock();
- try {
- // Enqueue the message so that it can be retrieved when application calls receive()
- // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
- // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
- if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
- possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(), Collections.singletonList(message));
+ // Enqueue the message so that it can be retrieved when application calls receive()
+ // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
+ // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
+ internalPinnedExecutor.execute(() -> {
+ if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
+ redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
+ possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
+ Collections.singletonList(message));
}
if (peekPendingReceive() != null) {
notifyPendingReceivedCallback(message, null);
@@ -1233,9 +1222,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
notifyPendingBatchReceivedCallBack();
}
}
- } finally {
- lock.readLock().unlock();
- }
+ });
} else {
// handle batch message enqueuing; uncompressed payload has all messages in batch
receiveIndividualMessagesFromBatch(msgMetadata, redeliveryCount, ackSet, uncompressedPayload, messageId, cnx);
@@ -1243,7 +1230,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
uncompressedPayload.release();
msgMetadata.recycle();
}
+ internalPinnedExecutor.execute(()
+ -> tryTriggerListener());
+ }
+
+ private void tryTriggerListener() {
if (listener != null) {
triggerListener();
}
@@ -1449,8 +1441,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
if (possibleToDeadLetter != null) {
possibleToDeadLetter.add(message);
}
- lock.readLock().lock();
- try {
+ internalPinnedExecutor.execute(() -> {
if (peekPendingReceive() != null) {
notifyPendingReceivedCallback(message, null);
} else if (enqueueMessageAndCheckBatchReceive(message)) {
@@ -1458,11 +1449,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
notifyPendingBatchReceivedCallBack();
}
}
- } finally {
- lock.readLock().unlock();
- }
- singleMessagePayload.release();
- singleMessageMetadataBuilder.recycle();
+ singleMessagePayload.release();
+ });
}
if (ackBitSet != null) {
ackBitSet.recycle();
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
index 8bb7bbc..0adb165 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.client.impl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.util.Timer;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.mockito.Mockito;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index dec7c2a..c7aad50 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -25,9 +25,12 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -38,21 +41,26 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
+import org.awaitility.Awaitility;
import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class ConsumerImplTest {
- private final ExecutorProvider executorProvider = new ExecutorProvider(1, new DefaultThreadFactory("ConsumerImplTest"));
+ private ExecutorProvider executorProvider = new ExecutorProvider(1, new DefaultThreadFactory("ConsumerImplTest"));
private ConsumerImpl<byte[]> consumer;
private ConsumerConfigurationData consumerConf;
+ private ExecutorService executorService;
@BeforeMethod
public void setUp() {
consumerConf = new ConsumerConfigurationData<>();
PulsarClientImpl client = ClientTestFixtures.createPulsarClientMock();
+ executorService = Executors.newSingleThreadExecutor();
+ when(client.getInternalExecutorService()).thenReturn(executorService);
ClientConfigurationData clientConf = client.getConfiguration();
clientConf.setOperationTimeoutMs(100);
clientConf.setStatsIntervalSeconds(0);
@@ -66,6 +74,18 @@ public class ConsumerImplTest {
consumer.setState(HandlerState.State.Ready);
}
+ @AfterMethod(alwaysRun = true)
+ public void cleanup() {
+ if (executorProvider != null) {
+ executorProvider.shutdownNow();
+ executorProvider = null;
+ }
+ if (executorService != null) {
+ executorService.shutdownNow();
+ executorService = null;
+ }
+ }
+
@Test(invocationTimeOut = 1000)
public void testNotifyPendingReceivedCallback_EmptyQueueNotThrowsException() {
consumer.notifyPendingReceivedCallback(null, null);
@@ -152,7 +172,7 @@ public class ConsumerImplTest {
public void testReceiveAsyncCanBeCancelled() {
// given
CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
- Assert.assertEquals(consumer.peekPendingReceive(), future);
+ Awaitility.await().untilAsserted(() -> Assert.assertEquals(consumer.peekPendingReceive(), future));
// when
future.cancel(true);
// then
@@ -163,7 +183,7 @@ public class ConsumerImplTest {
public void testBatchReceiveAsyncCanBeCancelled() {
// given
CompletableFuture<Messages<byte[]>> future = consumer.batchReceiveAsync();
- Assert.assertTrue(consumer.hasPendingBatchReceive());
+ Awaitility.await().untilAsserted(() -> Assert.assertTrue(consumer.hasPendingBatchReceive()));
// when
future.cancel(true);
// then
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
index 6e587f7..d0c4023 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
@@ -22,32 +22,50 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
public class ReaderImplTest {
ReaderImpl<byte[]> reader;
+ private ExecutorService executorService;
@BeforeMethod
void setupReader() {
PulsarClientImpl mockedClient = ClientTestFixtures.createPulsarClientMockWithMockedClientCnx();
ReaderConfigurationData<byte[]> readerConfiguration = new ReaderConfigurationData<>();
readerConfiguration.setTopicName("topicName");
+ executorService = Executors.newSingleThreadExecutor();
+ when(mockedClient.getInternalExecutorService()).thenReturn(executorService);
CompletableFuture<Consumer<byte[]>> consumerFuture = new CompletableFuture<>();
reader = new ReaderImpl<>(mockedClient, readerConfiguration, ClientTestFixtures.createMockedExecutorProvider(),
consumerFuture, Schema.BYTES);
}
+ @AfterMethod
+ public void clean() {
+ if (executorService != null) {
+ executorService.shutdownNow();
+ executorService = null;
+ }
+ }
+
@Test
void shouldSupportCancellingReadNextAsync() {
// given
CompletableFuture<Message<byte[]>> future = reader.readNextAsync();
- assertNotNull(reader.getConsumer().peekPendingReceive());
+ Awaitility.await().untilAsserted(() -> {
+ assertNotNull(reader.getConsumer().peekPendingReceive());
+ });
// when
future.cancel(false);