You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/07/28 07:07:57 UTC

[pulsar] branch branch-2.8 updated: [fix][broker][branch-2.8] Fix potential to add duplicated consumer (#16826)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new a3515aba8d0 [fix][broker][branch-2.8] Fix potential to add duplicated consumer (#16826)
a3515aba8d0 is described below

commit a3515aba8d0ffcb2e4ecc4e7ad36dcd1b252f249
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Thu Jul 28 15:07:51 2022 +0800

    [fix][broker][branch-2.8] Fix potential to add duplicated consumer (#16826)
    
    ### Motivation
    
    see #15051
    
    There have conflicts when cherry-picking #15051 PR (branch 2.8 has no `mock static` support), so I created a separate PR to fix branch-2.8
    
    ### Modifications
    
    Swap the execution order of duplicate validation and maximum validation
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  47 ++++---
 .../pulsar/broker/service/ServerCnxTest.java       | 140 ++++++++++++++++++++-
 2 files changed, 164 insertions(+), 23 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 5aeefff05dd..1ac9bf86562 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -976,34 +976,31 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                 consumerFuture);
 
                         if (existingConsumerFuture != null) {
-                            if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
-                                Consumer consumer = existingConsumerFuture.getNow(null);
-                                log.info("[{}] Consumer with the same id is already created:"
-                                         + " consumerId={}, consumer={}",
-                                         remoteAddress, consumerId, consumer);
-                                commandSender.sendSuccessResponse(requestId);
-                                return null;
-                            } else {
-                                // There was an early request to create a consumer with same consumerId. This can happen
+                            if (!existingConsumerFuture.isDone()){
                                 // when
                                 // client timeout is lower the broker timeouts. We need to wait until the previous
                                 // consumer
                                 // creation request either complete or fails.
                                 log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
-                                         + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
-                                ServerError error = null;
-                                if (!existingConsumerFuture.isDone()) {
-                                    error = ServerError.ServiceNotReady;
-                                } else {
-                                    error = getErrorCode(existingConsumerFuture);
-                                    consumers.remove(consumerId, existingConsumerFuture);
-                                }
-                                commandSender.sendErrorResponse(requestId, error,
+                                        + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
+                                commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
                                         "Consumer is already present on the connection");
-                                return null;
+                            } else if (existingConsumerFuture.isCompletedExceptionally()){
+                                ServerError error = getErrorCodeWithErrorLog(existingConsumerFuture, true,
+                                        String.format("Consumer subscribe failure. remoteAddress: %s, subscription: %s",
+                                                remoteAddress, subscriptionName));
+                                consumers.remove(consumerId, existingConsumerFuture);
+                                commandSender.sendErrorResponse(requestId, error,
+                                        "Consumer that failed is already present on the connection");
+                            } else {
+                                Consumer consumer = existingConsumerFuture.getNow(null);
+                                log.info("[{}] Consumer with the same id is already created:"
+                                                + " consumerId={}, consumer={}",
+                                        remoteAddress, consumerId, consumer);
+                                commandSender.sendSuccessResponse(requestId);
                             }
+                            return null;
                         }
-
                         boolean createTopicIfDoesNotExist = forceTopicCreation
                                 && service.isAllowAutoTopicCreation(topicName.toString());
 
@@ -2486,6 +2483,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     }
 
     private <T> ServerError getErrorCode(CompletableFuture<T> future) {
+        return getErrorCodeWithErrorLog(future, false, null);
+    }
+
+    private <T> ServerError getErrorCodeWithErrorLog(CompletableFuture<T> future, boolean logIfError,
+                                                     String errorMessageIfLog) {
         ServerError error = ServerError.UnknownError;
         try {
             future.getNow(null);
@@ -2493,6 +2495,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             if (e.getCause() instanceof BrokerServiceException) {
                 error = BrokerServiceException.getClientErrorCode(e.getCause());
             }
+            if (logIfError){
+                String finalErrorMessage = StringUtils.isNotBlank(errorMessageIfLog)
+                        ? errorMessageIfLog : "Unknown Error";
+                log.error(finalErrorMessage, e);
+            }
         }
         return error;
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index ff9c188a3f7..7e7654af089 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -53,8 +53,9 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
@@ -95,7 +96,6 @@ import org.apache.pulsar.common.protocol.PulsarHandler;
 import org.apache.pulsar.common.api.proto.AuthMethod;
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
-import org.apache.pulsar.common.api.proto.CommandConnect;
 import org.apache.pulsar.common.api.proto.CommandConnected;
 import org.apache.pulsar.common.api.proto.CommandError;
 import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
@@ -105,7 +105,6 @@ import org.apache.pulsar.common.api.proto.CommandSendReceipt;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.CommandSuccess;
-import org.apache.pulsar.common.api.proto.EncryptionKeys;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.ServerError;
@@ -113,13 +112,16 @@ import org.apache.pulsar.common.api.proto.BaseCommand.Type;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.zookeeper.ZooKeeper;
 import org.awaitility.Awaitility;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.powermock.reflect.Whitebox;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -1848,4 +1850,136 @@ public class ServerCnxTest {
 
         channel.finish();
     }
+
+    @Test
+    public void testNeverDelayConsumerFutureWhenNotFail() throws Exception{
+        // Mock ServerCnx.field: consumers
+        ConcurrentLongHashMap consumers = Mockito.mock(ConcurrentLongHashMap.class);
+        ArgumentCaptor<Long> ignoreArgumentCaptor = ArgumentCaptor.forClass(Long.class);
+        final ArgumentCaptor<CompletableFuture> deleteTimesMark = ArgumentCaptor.forClass(CompletableFuture.class);
+        Mockito.when(consumers.remove(ignoreArgumentCaptor.capture())).thenReturn(true);
+        Mockito.when(consumers.remove(ignoreArgumentCaptor.capture(), deleteTimesMark.capture())).thenReturn(true);
+        // case1: exists existingConsumerFuture, already complete or delay done after execute 'isDone()' many times
+        // case2: exists existingConsumerFuture, delay complete after execute 'isDone()' many times
+        // Why is the design so complicated, see: https://github.com/apache/pulsar/pull/15051
+        // Try a delay of 3 stages. The simulation is successful after repeated judgments.
+        for (AtomicInteger futureWillDoneAfterDelayTimes = new AtomicInteger(1);
+                                            futureWillDoneAfterDelayTimes.intValue() <= 3;
+                                            futureWillDoneAfterDelayTimes.incrementAndGet()){
+            final AtomicInteger futureCallTimes = new AtomicInteger();
+            final Consumer mockConsumer = Mockito.mock(Consumer.class);
+            CompletableFuture existingConsumerFuture = new CompletableFuture<Consumer>(){
+
+                private boolean complete;
+
+                // delay complete after execute 'isDone()' many times
+                @Override
+                public boolean isDone() {
+                    if (complete) {
+                        return true;
+                    }
+                    int executeIsDoneCommandTimes = futureCallTimes.incrementAndGet();
+                    return executeIsDoneCommandTimes >= futureWillDoneAfterDelayTimes.intValue();
+                }
+
+                // if trig "getNow()", then complete
+                @Override
+                public Consumer get(){
+                    complete = true;
+                    return mockConsumer;
+                }
+
+                // if trig "get()", then complete
+                @Override
+                public Consumer get(long timeout, TimeUnit unit){
+                    complete = true;
+                    return mockConsumer;
+                }
+
+                // if trig "get()", then complete
+                @Override
+                public Consumer getNow(Consumer ifAbsent){
+                    complete = true;
+                    return mockConsumer;
+                }
+
+                // never fail
+                public boolean isCompletedExceptionally(){
+                    return false;
+                }
+            };
+            Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
+            // do test: delay complete after execute 'isDone()' many times
+            // Why is the design so complicated, see: https://github.com/apache/pulsar/pull/15051
+            // reset channels( serverChannel, clientChannel )
+            resetChannel();
+            Whitebox.setInternalState(serverCnx, "consumers", consumers);
+            setChannelConnected();
+            // auth check disable
+            doReturn(false).when(brokerService).isAuthenticationEnabled();
+            doReturn(false).when(brokerService).isAuthorizationEnabled();
+            // do subscribe
+            ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
+                    successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
+                    "test" /* consumer name */, 0 /* avoid reseting cursor */);
+            channel.writeInbound(clientCommand);
+            Object responseObj = getResponse();
+            Predicate<Object> responseAssert = obj -> {
+                if (responseObj instanceof CommandSuccess) {
+                    return true;
+                }
+                if (responseObj instanceof CommandError) {
+                    CommandError commandError = (CommandError) responseObj;
+                    return ServerError.ServiceNotReady == commandError.getError();
+                }
+                return false;
+            };
+            // assert no consumer-delete event occur
+            assertFalse(deleteTimesMark.getAllValues().contains(existingConsumerFuture));
+            // assert without another error occur
+            assertTrue(responseAssert.test(responseAssert));
+            // Server will not close the connection
+            assertTrue(channel.isOpen());
+            channel.finish();
+        }
+        // case3: exists existingConsumerFuture, already complete and exception
+        CompletableFuture existingConsumerFuture = Mockito.mock(CompletableFuture.class);
+        Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
+        // make consumerFuture delay finish
+        Mockito.when(existingConsumerFuture.isDone()).thenReturn(true);
+        // when sync get return, future will return success value.
+        Mockito.when(existingConsumerFuture.get()).thenThrow(new NullPointerException());
+        Mockito.when(existingConsumerFuture.get(Mockito.anyLong(), Mockito.any())).
+                thenThrow(new NullPointerException());
+        Mockito.when(existingConsumerFuture.isCompletedExceptionally()).thenReturn(true);
+        Mockito.when(existingConsumerFuture.getNow(Mockito.any())).thenThrow(new NullPointerException());
+        // reset channels( serverChannel, clientChannel )
+        resetChannel();
+        Whitebox.setInternalState(serverCnx, "consumers", consumers);
+        setChannelConnected();
+        // auth check disable
+        doReturn(false).when(brokerService).isAuthenticationEnabled();
+        doReturn(false).when(brokerService).isAuthorizationEnabled();
+        // do subscribe
+        ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
+                successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
+                "test" /* consumer name */, 0 /* avoid reseting cursor */);
+        channel.writeInbound(clientCommand);
+        Object responseObj = getResponse();
+        Predicate<Object> responseAssert = obj -> {
+            if (responseObj instanceof CommandError) {
+                CommandError commandError = (CommandError) responseObj;
+                return ServerError.ServiceNotReady != commandError.getError();
+            }
+            return false;
+        };
+        // assert error response
+        assertTrue(responseAssert.test(responseAssert));
+        // assert consumer-delete event occur
+        assertEquals(1L,
+                deleteTimesMark.getAllValues().stream().filter(f -> f == existingConsumerFuture).count());
+        // Server will not close the connection
+        assertTrue(channel.isOpen());
+        channel.finish();
+    }
 }