You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/20 15:06:12 UTC

[pulsar] 30/31: Fix potential to add duplicated consumer (#15051)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9dead563ad0218063461fb03d40618c5b09c9ab4
Author: fengyubiao <99...@qq.com>
AuthorDate: Wed May 18 18:41:08 2022 +0800

    Fix potential to add duplicated consumer (#15051)
    
    It's because of this issue https://github.com/apache/pulsar/issues/13787.
    Then diving into the codes, I find that if the client tries to subscribe multiple times over a short period of time, it is possible to have more than one consumer at the same dispatcher. just like below:
    ```
    for ( long requestId = 1; i < 5; i++ ){
      ByteBuf request1 = Commands.newSubscribe(topic, subscription, consumerId, requestId , getSubType(),
              priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted,
              conf.isReplicateSubscriptionState(),
              InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
              startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
              // Use the current epoch to subscribe.
              conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this));
      cnx.sendRequestWithId(request1, requestId).thenRun(() -> {});
    }
    ```
    
    The root cause is below snippet:
    https://github.com/apache/pulsar/blob/c2c05c49aff1ebc7b2b7a1d5bd547c33211e4479/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L994-L1021
    If the consumer1 comes and is not done, then the same consumer2(it's the same with consumer1) comes, it may remove the prior consumer1(line 1015), but consumer1 add to subscription success in the end, Then the same cusumer3 comes, and it succeed, and will cause the same consumer to add duplicated.
    
    The right way to remove consumer (line 1015) is when the `existingConsumerFuture` is completedExceptionally.
    
    Even though the Java client couldn't occur the above behavior, other clients may not. So it's better to handle `subscribe` correctly on the broker side.
    
    Modify the process execution sequence to improve stability
    
    (cherry picked from commit 7bf495aca70798257d79d370f33c3870f31815a9)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  45 ++++---
 .../pulsar/broker/service/ServerCnxTest.java       | 147 +++++++++++++++++++++
 .../TopicTransactionBufferRecoverTest.java         |   2 +-
 3 files changed, 175 insertions(+), 19 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 10a4a2c412d..0c102a1198d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -992,32 +992,31 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 }
 
                 if (existingConsumerFuture != null) {
-                    if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
-                        Consumer consumer = existingConsumerFuture.getNow(null);
-                        log.info("[{}] Consumer with the same id is already created:"
-                                 + " consumerId={}, consumer={}",
-                                 remoteAddress, consumerId, consumer);
-                        commandSender.sendSuccessResponse(requestId);
-                        return null;
-                    } else {
+                    if (!existingConsumerFuture.isDone()){
                         // There was an early request to create a consumer with same consumerId. This can happen
                         // when
                         // client timeout is lower the broker timeouts. We need to wait until the previous
                         // consumer
                         // creation request either complete or fails.
                         log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
-                                 + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
-                        ServerError error = null;
-                        if (!existingConsumerFuture.isDone()) {
-                            error = ServerError.ServiceNotReady;
-                        } else {
-                            error = getErrorCode(existingConsumerFuture);
-                            consumers.remove(consumerId, existingConsumerFuture);
-                        }
-                        commandSender.sendErrorResponse(requestId, error,
+                                + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
+                        commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
                                 "Consumer is already present on the connection");
-                        return null;
+                    } else if (existingConsumerFuture.isCompletedExceptionally()){
+                        ServerError error = getErrorCodeWithErrorLog(existingConsumerFuture, true,
+                                String.format("Consumer subscribe failure. remoteAddress: %s, subscription: %s",
+                                        remoteAddress, subscriptionName));
+                        consumers.remove(consumerId, existingConsumerFuture);
+                        commandSender.sendErrorResponse(requestId, error,
+                                "Consumer that failed is already present on the connection");
+                    } else {
+                        Consumer consumer = existingConsumerFuture.getNow(null);
+                        log.info("[{}] Consumer with the same id is already created:"
+                                        + " consumerId={}, consumer={}",
+                                remoteAddress, consumerId, consumer);
+                        commandSender.sendSuccessResponse(requestId);
                     }
+                    return null;
                 }
 
                 boolean createTopicIfDoesNotExist = forceTopicCreation
@@ -2711,6 +2710,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     }
 
     private <T> ServerError getErrorCode(CompletableFuture<T> future) {
+        return getErrorCodeWithErrorLog(future, false, null);
+    }
+
+    private <T> ServerError getErrorCodeWithErrorLog(CompletableFuture<T> future, boolean logIfError,
+                                                     String errorMessageIfLog) {
         ServerError error = ServerError.UnknownError;
         try {
             future.getNow(null);
@@ -2718,6 +2722,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             if (e.getCause() instanceof BrokerServiceException) {
                 error = BrokerServiceException.getClientErrorCode(e.getCause());
             }
+            if (logIfError){
+                String finalErrorMessage = StringUtils.isNotBlank(errorMessageIfLog)
+                        ? errorMessageIfLog : "Unknown Error";
+                log.error(finalErrorMessage, e);
+            }
         }
         return error;
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 1772581baf5..5bef206a44d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -51,6 +51,8 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
@@ -107,10 +109,13 @@ import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Commands.ChecksumType;
 import org.apache.pulsar.common.protocol.PulsarHandler;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.apache.zookeeper.ZooKeeper;
 import org.awaitility.Awaitility;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -1827,4 +1832,146 @@ public class ServerCnxTest {
 
         channel.finish();
     }
+
+    @Test
+    public void testNeverDelayConsumerFutureWhenNotFail() throws Exception{
+        // Mock ServerCnx.field: consumers
+        ConcurrentLongHashMap.Builder mapBuilder = Mockito.mock(ConcurrentLongHashMap.Builder.class);
+        Mockito.when(mapBuilder.expectedItems(Mockito.anyInt())).thenReturn(mapBuilder);
+        Mockito.when(mapBuilder.concurrencyLevel(Mockito.anyInt())).thenReturn(mapBuilder);
+        ConcurrentLongHashMap consumers = Mockito.mock(ConcurrentLongHashMap.class);
+        Mockito.when(mapBuilder.build()).thenReturn(consumers);
+        ArgumentCaptor<Long> ignoreArgumentCaptor = ArgumentCaptor.forClass(Long.class);
+        final ArgumentCaptor<CompletableFuture> deleteTimesMark = ArgumentCaptor.forClass(CompletableFuture.class);
+        Mockito.when(consumers.remove(ignoreArgumentCaptor.capture())).thenReturn(true);
+        Mockito.when(consumers.remove(ignoreArgumentCaptor.capture(), deleteTimesMark.capture())).thenReturn(true);
+        // case1: exists existingConsumerFuture, already complete or delay done after execute 'isDone()' many times
+        // case2: exists existingConsumerFuture, delay complete after execute 'isDone()' many times
+        // Why is the design so complicated, see: https://github.com/apache/pulsar/pull/15051
+        // Try a delay of 3 stages. The simulation is successful after repeated judgments.
+        for(AtomicInteger futureWillDoneAfterDelayTimes = new AtomicInteger(1);
+                                            futureWillDoneAfterDelayTimes.intValue() <= 3;
+                                            futureWillDoneAfterDelayTimes.incrementAndGet()){
+            final AtomicInteger futureCallTimes = new AtomicInteger();
+            final Consumer mockConsumer = Mockito.mock(Consumer.class);
+            CompletableFuture existingConsumerFuture = new CompletableFuture<Consumer>(){
+
+                private boolean complete;
+
+                // delay complete after execute 'isDone()' many times
+                @Override
+                public boolean isDone() {
+                    if (complete) {
+                        return true;
+                    }
+                    int executeIsDoneCommandTimes = futureCallTimes.incrementAndGet();
+                    return executeIsDoneCommandTimes >= futureWillDoneAfterDelayTimes.intValue();
+                }
+
+                // if trig "getNow()", then complete
+                @Override
+                public Consumer get(){
+                    complete = true;
+                    return mockConsumer;
+                }
+
+                // if trig "get()", then complete
+                @Override
+                public Consumer get(long timeout, TimeUnit unit){
+                    complete = true;
+                    return mockConsumer;
+                }
+
+                // if trig "get()", then complete
+                @Override
+                public Consumer getNow(Consumer ifAbsent){
+                    complete = true;
+                    return mockConsumer;
+                }
+
+                // never fail
+                public boolean isCompletedExceptionally(){
+                    return false;
+                }
+            };
+            Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
+            // do test: delay complete after execute 'isDone()' many times
+            // Why is the design so complicated, see: https://github.com/apache/pulsar/pull/15051
+            try (MockedStatic<ConcurrentLongHashMap> theMock = Mockito.mockStatic(ConcurrentLongHashMap.class)) {
+                // Inject consumers to ServerCnx
+                theMock.when(ConcurrentLongHashMap::newBuilder).thenReturn(mapBuilder);
+                // reset channels( serverChannel, clientChannel )
+                resetChannel();
+                setChannelConnected();
+                // auth check disable
+                doReturn(false).when(brokerService).isAuthenticationEnabled();
+                doReturn(false).when(brokerService).isAuthorizationEnabled();
+                // do subscribe
+                ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
+                        successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
+                        "test" /* consumer name */, 0 /* avoid reseting cursor */);
+                channel.writeInbound(clientCommand);
+                Object responseObj = getResponse();
+                Predicate<Object> responseAssert = obj -> {
+                    if (responseObj instanceof CommandSuccess) {
+                        return true;
+                    }
+                    if (responseObj instanceof CommandError) {
+                        CommandError commandError = (CommandError) responseObj;
+                        return ServerError.ServiceNotReady == commandError.getError();
+                    }
+                    return false;
+                };
+                // assert no consumer-delete event occur
+                assertFalse(deleteTimesMark.getAllValues().contains(existingConsumerFuture));
+                // assert without another error occur
+                assertTrue(responseAssert.test(responseAssert));
+                // Server will not close the connection
+                assertTrue(channel.isOpen());
+                channel.finish();
+            }
+        }
+        // case3: exists existingConsumerFuture, already complete and exception
+        CompletableFuture existingConsumerFuture = Mockito.mock(CompletableFuture.class);
+        Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
+        // make consumerFuture delay finish
+        Mockito.when(existingConsumerFuture.isDone()).thenReturn(true);
+        // when sync get return, future will return success value.
+        Mockito.when(existingConsumerFuture.get()).thenThrow(new NullPointerException());
+        Mockito.when(existingConsumerFuture.get(Mockito.anyLong(), Mockito.any())).
+                thenThrow(new NullPointerException());
+        Mockito.when(existingConsumerFuture.isCompletedExceptionally()).thenReturn(true);
+        Mockito.when(existingConsumerFuture.getNow(Mockito.any())).thenThrow(new NullPointerException());
+        try (MockedStatic<ConcurrentLongHashMap> theMock = Mockito.mockStatic(ConcurrentLongHashMap.class)) {
+            // Inject consumers to ServerCnx
+            theMock.when(ConcurrentLongHashMap::newBuilder).thenReturn(mapBuilder);
+            // reset channels( serverChannel, clientChannel )
+            resetChannel();
+            setChannelConnected();
+            // auth check disable
+            doReturn(false).when(brokerService).isAuthenticationEnabled();
+            doReturn(false).when(brokerService).isAuthorizationEnabled();
+            // do subscribe
+            ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
+                    successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
+                    "test" /* consumer name */, 0 /* avoid reseting cursor */);
+            channel.writeInbound(clientCommand);
+            Object responseObj = getResponse();
+            Predicate<Object> responseAssert = obj -> {
+                if (responseObj instanceof CommandError) {
+                    CommandError commandError = (CommandError) responseObj;
+                    return ServerError.ServiceNotReady != commandError.getError();
+                }
+                return false;
+            };
+            // assert error response
+            assertTrue(responseAssert.test(responseAssert));
+            // assert consumer-delete event occur
+            assertEquals(1L,
+                    deleteTimesMark.getAllValues().stream().filter(f -> f == existingConsumerFuture).count());
+            // Server will not close the connection
+            assertTrue(channel.isOpen());
+            channel.finish();
+        }
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 351fe124852..9f6d4eb121b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -493,7 +493,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
         doThrow(new RuntimeException("test")).when(reader).hasMoreEvents();
         // check reader close topic
         checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
-                transactionBufferSnapshotService, originalTopic, field, producer);
+                transactionBufferSnapshotService, originalTopic, field);
         doReturn(true).when(reader).hasMoreEvents();
 
         // mock reader can't read snapshot fail throw PulsarClientException