You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/05/25 07:57:16 UTC

[pulsar] branch branch-2.9 updated: Fix potential to add duplicated consumer (#15051)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new f569fa47307 Fix potential to add duplicated consumer (#15051)
f569fa47307 is described below

commit f569fa4730717878bea748af42f28c800706e4e2
Author: fengyubiao <99...@qq.com>
AuthorDate: Wed May 18 18:41:08 2022 +0800

    Fix potential to add duplicated consumer (#15051)
    
    ### Motivation
    It's because of this issue https://github.com/apache/pulsar/issues/13787.
    Then diving into the codes, I find that if the client tries to subscribe multiple times over a short period of time, it is possible to have more than one consumer at the same dispatcher. just like below:
    ```
    for ( long requestId = 1; i < 5; i++ ){
      ByteBuf request1 = Commands.newSubscribe(topic, subscription, consumerId, requestId , getSubType(),
              priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted,
              conf.isReplicateSubscriptionState(),
              InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
              startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
              // Use the current epoch to subscribe.
              conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this));
      cnx.sendRequestWithId(request1, requestId).thenRun(() -> {});
    }
    ```
    
    The root cause is below snippet:
    https://github.com/apache/pulsar/blob/c2c05c49aff1ebc7b2b7a1d5bd547c33211e4479/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L994-L1021
    If the consumer1 comes and is not done, then the same consumer2(it's the same with consumer1) comes, it may remove the prior consumer1(line 1015), but consumer1 add to subscription success in the end, Then the same cusumer3 comes, and it succeed, and will cause the same consumer to add duplicated.
    
    The right way to remove consumer (line 1015) is when the `existingConsumerFuture` is completedExceptionally.
    
    Even though the Java client couldn't occur the above behavior, other clients may not. So it's better to handle `subscribe` correctly on the broker side.
    
    ### Modifications
    
    Modify the process execution sequence to improve stability
    
    (cherry picked from commit 7bf495aca70798257d79d370f33c3870f31815a9)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  45 ++++---
 .../pulsar/broker/service/ServerCnxTest.java       | 147 +++++++++++++++++++++
 2 files changed, 174 insertions(+), 18 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 03a6a80aed3..3c420fe43b3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -982,32 +982,31 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                         consumerFuture);
 
                 if (existingConsumerFuture != null) {
-                    if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
-                        Consumer consumer = existingConsumerFuture.getNow(null);
-                        log.info("[{}] Consumer with the same id is already created:"
-                                 + " consumerId={}, consumer={}",
-                                 remoteAddress, consumerId, consumer);
-                        commandSender.sendSuccessResponse(requestId);
-                        return null;
-                    } else {
+                    if (!existingConsumerFuture.isDone()){
                         // There was an early request to create a consumer with same consumerId. This can happen
                         // when
                         // client timeout is lower the broker timeouts. We need to wait until the previous
                         // consumer
                         // creation request either complete or fails.
                         log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
-                                 + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
-                        ServerError error = null;
-                        if (!existingConsumerFuture.isDone()) {
-                            error = ServerError.ServiceNotReady;
-                        } else {
-                            error = getErrorCode(existingConsumerFuture);
-                            consumers.remove(consumerId, existingConsumerFuture);
-                        }
-                        commandSender.sendErrorResponse(requestId, error,
+                                + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
+                        commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
                                 "Consumer is already present on the connection");
-                        return null;
+                    } else if (existingConsumerFuture.isCompletedExceptionally()){
+                        ServerError error = getErrorCodeWithErrorLog(existingConsumerFuture, true,
+                                String.format("Consumer subscribe failure. remoteAddress: %s, subscription: %s",
+                                        remoteAddress, subscriptionName));
+                        consumers.remove(consumerId, existingConsumerFuture);
+                        commandSender.sendErrorResponse(requestId, error,
+                                "Consumer that failed is already present on the connection");
+                    } else {
+                        Consumer consumer = existingConsumerFuture.getNow(null);
+                        log.info("[{}] Consumer with the same id is already created:"
+                                        + " consumerId={}, consumer={}",
+                                remoteAddress, consumerId, consumer);
+                        commandSender.sendSuccessResponse(requestId);
                     }
+                    return null;
                 }
 
                 boolean createTopicIfDoesNotExist = forceTopicCreation
@@ -2597,6 +2596,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     }
 
     private <T> ServerError getErrorCode(CompletableFuture<T> future) {
+        return getErrorCodeWithErrorLog(future, false, null);
+    }
+
+    private <T> ServerError getErrorCodeWithErrorLog(CompletableFuture<T> future, boolean logIfError,
+                                                     String errorMessageIfLog) {
         ServerError error = ServerError.UnknownError;
         try {
             future.getNow(null);
@@ -2604,6 +2608,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             if (e.getCause() instanceof BrokerServiceException) {
                 error = BrokerServiceException.getClientErrorCode(e.getCause());
             }
+            if (logIfError){
+                String finalErrorMessage = StringUtils.isNotBlank(errorMessageIfLog)
+                        ? errorMessageIfLog : "Unknown Error";
+                log.error(finalErrorMessage, e);
+            }
         }
         return error;
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 1a5efb19f3e..d2dac88efe8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -51,6 +51,8 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
@@ -107,11 +109,14 @@ import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Commands.ChecksumType;
 import org.apache.pulsar.common.protocol.PulsarHandler;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.zookeeper.ZooKeeper;
 import org.awaitility.Awaitility;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -1807,4 +1812,146 @@ public class ServerCnxTest {
 
         channel.finish();
     }
+
+    @Test
+    public void testNeverDelayConsumerFutureWhenNotFail() throws Exception{
+        // Mock ServerCnx.field: consumers
+        ConcurrentLongHashMap.Builder mapBuilder = Mockito.mock(ConcurrentLongHashMap.Builder.class);
+        Mockito.when(mapBuilder.expectedItems(Mockito.anyInt())).thenReturn(mapBuilder);
+        Mockito.when(mapBuilder.concurrencyLevel(Mockito.anyInt())).thenReturn(mapBuilder);
+        ConcurrentLongHashMap consumers = Mockito.mock(ConcurrentLongHashMap.class);
+        Mockito.when(mapBuilder.build()).thenReturn(consumers);
+        ArgumentCaptor<Long> ignoreArgumentCaptor = ArgumentCaptor.forClass(Long.class);
+        final ArgumentCaptor<CompletableFuture> deleteTimesMark = ArgumentCaptor.forClass(CompletableFuture.class);
+        Mockito.when(consumers.remove(ignoreArgumentCaptor.capture())).thenReturn(true);
+        Mockito.when(consumers.remove(ignoreArgumentCaptor.capture(), deleteTimesMark.capture())).thenReturn(true);
+        // case1: exists existingConsumerFuture, already complete or delay done after execute 'isDone()' many times
+        // case2: exists existingConsumerFuture, delay complete after execute 'isDone()' many times
+        // Why is the design so complicated, see: https://github.com/apache/pulsar/pull/15051
+        // Try a delay of 3 stages. The simulation is successful after repeated judgments.
+        for(AtomicInteger futureWillDoneAfterDelayTimes = new AtomicInteger(1);
+                                            futureWillDoneAfterDelayTimes.intValue() <= 3;
+                                            futureWillDoneAfterDelayTimes.incrementAndGet()){
+            final AtomicInteger futureCallTimes = new AtomicInteger();
+            final Consumer mockConsumer = Mockito.mock(Consumer.class);
+            CompletableFuture existingConsumerFuture = new CompletableFuture<Consumer>(){
+
+                private boolean complete;
+
+                // delay complete after execute 'isDone()' many times
+                @Override
+                public boolean isDone() {
+                    if (complete) {
+                        return true;
+                    }
+                    int executeIsDoneCommandTimes = futureCallTimes.incrementAndGet();
+                    return executeIsDoneCommandTimes >= futureWillDoneAfterDelayTimes.intValue();
+                }
+
+                // if trig "getNow()", then complete
+                @Override
+                public Consumer get(){
+                    complete = true;
+                    return mockConsumer;
+                }
+
+                // if trig "get()", then complete
+                @Override
+                public Consumer get(long timeout, TimeUnit unit){
+                    complete = true;
+                    return mockConsumer;
+                }
+
+                // if trig "get()", then complete
+                @Override
+                public Consumer getNow(Consumer ifAbsent){
+                    complete = true;
+                    return mockConsumer;
+                }
+
+                // never fail
+                public boolean isCompletedExceptionally(){
+                    return false;
+                }
+            };
+            Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
+            // do test: delay complete after execute 'isDone()' many times
+            // Why is the design so complicated, see: https://github.com/apache/pulsar/pull/15051
+            try (MockedStatic<ConcurrentLongHashMap> theMock = Mockito.mockStatic(ConcurrentLongHashMap.class)) {
+                // Inject consumers to ServerCnx
+                theMock.when(ConcurrentLongHashMap::newBuilder).thenReturn(mapBuilder);
+                // reset channels( serverChannel, clientChannel )
+                resetChannel();
+                setChannelConnected();
+                // auth check disable
+                doReturn(false).when(brokerService).isAuthenticationEnabled();
+                doReturn(false).when(brokerService).isAuthorizationEnabled();
+                // do subscribe
+                ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
+                        successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
+                        "test" /* consumer name */, 0 /* avoid reseting cursor */);
+                channel.writeInbound(clientCommand);
+                Object responseObj = getResponse();
+                Predicate<Object> responseAssert = obj -> {
+                    if (responseObj instanceof CommandSuccess) {
+                        return true;
+                    }
+                    if (responseObj instanceof CommandError) {
+                        CommandError commandError = (CommandError) responseObj;
+                        return ServerError.ServiceNotReady == commandError.getError();
+                    }
+                    return false;
+                };
+                // assert no consumer-delete event occur
+                assertFalse(deleteTimesMark.getAllValues().contains(existingConsumerFuture));
+                // assert without another error occur
+                assertTrue(responseAssert.test(responseAssert));
+                // Server will not close the connection
+                assertTrue(channel.isOpen());
+                channel.finish();
+            }
+        }
+        // case3: exists existingConsumerFuture, already complete and exception
+        CompletableFuture existingConsumerFuture = Mockito.mock(CompletableFuture.class);
+        Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
+        // make consumerFuture delay finish
+        Mockito.when(existingConsumerFuture.isDone()).thenReturn(true);
+        // when sync get return, future will return success value.
+        Mockito.when(existingConsumerFuture.get()).thenThrow(new NullPointerException());
+        Mockito.when(existingConsumerFuture.get(Mockito.anyLong(), Mockito.any())).
+                thenThrow(new NullPointerException());
+        Mockito.when(existingConsumerFuture.isCompletedExceptionally()).thenReturn(true);
+        Mockito.when(existingConsumerFuture.getNow(Mockito.any())).thenThrow(new NullPointerException());
+        try (MockedStatic<ConcurrentLongHashMap> theMock = Mockito.mockStatic(ConcurrentLongHashMap.class)) {
+            // Inject consumers to ServerCnx
+            theMock.when(ConcurrentLongHashMap::newBuilder).thenReturn(mapBuilder);
+            // reset channels( serverChannel, clientChannel )
+            resetChannel();
+            setChannelConnected();
+            // auth check disable
+            doReturn(false).when(brokerService).isAuthenticationEnabled();
+            doReturn(false).when(brokerService).isAuthorizationEnabled();
+            // do subscribe
+            ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
+                    successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
+                    "test" /* consumer name */, 0 /* avoid reseting cursor */);
+            channel.writeInbound(clientCommand);
+            Object responseObj = getResponse();
+            Predicate<Object> responseAssert = obj -> {
+                if (responseObj instanceof CommandError) {
+                    CommandError commandError = (CommandError) responseObj;
+                    return ServerError.ServiceNotReady != commandError.getError();
+                }
+                return false;
+            };
+            // assert error response
+            assertTrue(responseAssert.test(responseAssert));
+            // assert consumer-delete event occur
+            assertEquals(1L,
+                    deleteTimesMark.getAllValues().stream().filter(f -> f == existingConsumerFuture).count());
+            // Server will not close the connection
+            assertTrue(channel.isOpen());
+            channel.finish();
+        }
+    }
 }