You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/07/28 07:07:57 UTC
[pulsar] branch branch-2.8 updated: [fix][broker][branch-2.8] Fix potential to add duplicated consumer (#16826)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new a3515aba8d0 [fix][broker][branch-2.8] Fix potential to add duplicated consumer (#16826)
a3515aba8d0 is described below
commit a3515aba8d0ffcb2e4ecc4e7ad36dcd1b252f249
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Thu Jul 28 15:07:51 2022 +0800
[fix][broker][branch-2.8] Fix potential to add duplicated consumer (#16826)
### Motivation
see #15051
There have conflicts when cherry-picking #15051 PR (branch 2.8 has no `mock static` support), so I created a separate PR to fix branch-2.8
### Modifications
Swap the execution order of duplicate validation and maximum validation
---
.../apache/pulsar/broker/service/ServerCnx.java | 47 ++++---
.../pulsar/broker/service/ServerCnxTest.java | 140 ++++++++++++++++++++-
2 files changed, 164 insertions(+), 23 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 5aeefff05dd..1ac9bf86562 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -976,34 +976,31 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
consumerFuture);
if (existingConsumerFuture != null) {
- if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
- Consumer consumer = existingConsumerFuture.getNow(null);
- log.info("[{}] Consumer with the same id is already created:"
- + " consumerId={}, consumer={}",
- remoteAddress, consumerId, consumer);
- commandSender.sendSuccessResponse(requestId);
- return null;
- } else {
- // There was an early request to create a consumer with same consumerId. This can happen
+ if (!existingConsumerFuture.isDone()){
// when
// client timeout is lower the broker timeouts. We need to wait until the previous
// consumer
// creation request either complete or fails.
log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
- + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
- ServerError error = null;
- if (!existingConsumerFuture.isDone()) {
- error = ServerError.ServiceNotReady;
- } else {
- error = getErrorCode(existingConsumerFuture);
- consumers.remove(consumerId, existingConsumerFuture);
- }
- commandSender.sendErrorResponse(requestId, error,
+ + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
+ commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
"Consumer is already present on the connection");
- return null;
+ } else if (existingConsumerFuture.isCompletedExceptionally()){
+ ServerError error = getErrorCodeWithErrorLog(existingConsumerFuture, true,
+ String.format("Consumer subscribe failure. remoteAddress: %s, subscription: %s",
+ remoteAddress, subscriptionName));
+ consumers.remove(consumerId, existingConsumerFuture);
+ commandSender.sendErrorResponse(requestId, error,
+ "Consumer that failed is already present on the connection");
+ } else {
+ Consumer consumer = existingConsumerFuture.getNow(null);
+ log.info("[{}] Consumer with the same id is already created:"
+ + " consumerId={}, consumer={}",
+ remoteAddress, consumerId, consumer);
+ commandSender.sendSuccessResponse(requestId);
}
+ return null;
}
-
boolean createTopicIfDoesNotExist = forceTopicCreation
&& service.isAllowAutoTopicCreation(topicName.toString());
@@ -2486,6 +2483,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
}
private <T> ServerError getErrorCode(CompletableFuture<T> future) {
+ return getErrorCodeWithErrorLog(future, false, null);
+ }
+
+ private <T> ServerError getErrorCodeWithErrorLog(CompletableFuture<T> future, boolean logIfError,
+ String errorMessageIfLog) {
ServerError error = ServerError.UnknownError;
try {
future.getNow(null);
@@ -2493,6 +2495,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
if (e.getCause() instanceof BrokerServiceException) {
error = BrokerServiceException.getClientErrorCode(e.getCause());
}
+ if (logIfError){
+ String finalErrorMessage = StringUtils.isNotBlank(errorMessageIfLog)
+ ? errorMessageIfLog : "Unknown Error";
+ log.error(finalErrorMessage, e);
+ }
}
return error;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index ff9c188a3f7..7e7654af089 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -53,8 +53,9 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
@@ -95,7 +96,6 @@ import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.api.proto.AuthMethod;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
-import org.apache.pulsar.common.api.proto.CommandConnect;
import org.apache.pulsar.common.api.proto.CommandConnected;
import org.apache.pulsar.common.api.proto.CommandError;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
@@ -105,7 +105,6 @@ import org.apache.pulsar.common.api.proto.CommandSendReceipt;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.CommandSuccess;
-import org.apache.pulsar.common.api.proto.EncryptionKeys;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.ServerError;
@@ -113,13 +112,16 @@ import org.apache.pulsar.common.api.proto.BaseCommand.Type;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.powermock.reflect.Whitebox;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -1848,4 +1850,136 @@ public class ServerCnxTest {
channel.finish();
}
+
+ @Test
+ public void testNeverDelayConsumerFutureWhenNotFail() throws Exception{
+ // Mock ServerCnx.field: consumers
+ ConcurrentLongHashMap consumers = Mockito.mock(ConcurrentLongHashMap.class);
+ ArgumentCaptor<Long> ignoreArgumentCaptor = ArgumentCaptor.forClass(Long.class);
+ final ArgumentCaptor<CompletableFuture> deleteTimesMark = ArgumentCaptor.forClass(CompletableFuture.class);
+ Mockito.when(consumers.remove(ignoreArgumentCaptor.capture())).thenReturn(true);
+ Mockito.when(consumers.remove(ignoreArgumentCaptor.capture(), deleteTimesMark.capture())).thenReturn(true);
+ // case1: exists existingConsumerFuture, already complete or delay done after execute 'isDone()' many times
+ // case2: exists existingConsumerFuture, delay complete after execute 'isDone()' many times
+ // Why is the design so complicated, see: https://github.com/apache/pulsar/pull/15051
+ // Try a delay of 3 stages. The simulation is successful after repeated judgments.
+ for (AtomicInteger futureWillDoneAfterDelayTimes = new AtomicInteger(1);
+ futureWillDoneAfterDelayTimes.intValue() <= 3;
+ futureWillDoneAfterDelayTimes.incrementAndGet()){
+ final AtomicInteger futureCallTimes = new AtomicInteger();
+ final Consumer mockConsumer = Mockito.mock(Consumer.class);
+ CompletableFuture existingConsumerFuture = new CompletableFuture<Consumer>(){
+
+ private boolean complete;
+
+ // delay complete after execute 'isDone()' many times
+ @Override
+ public boolean isDone() {
+ if (complete) {
+ return true;
+ }
+ int executeIsDoneCommandTimes = futureCallTimes.incrementAndGet();
+ return executeIsDoneCommandTimes >= futureWillDoneAfterDelayTimes.intValue();
+ }
+
+ // if trig "getNow()", then complete
+ @Override
+ public Consumer get(){
+ complete = true;
+ return mockConsumer;
+ }
+
+ // if trig "get()", then complete
+ @Override
+ public Consumer get(long timeout, TimeUnit unit){
+ complete = true;
+ return mockConsumer;
+ }
+
+ // if trig "get()", then complete
+ @Override
+ public Consumer getNow(Consumer ifAbsent){
+ complete = true;
+ return mockConsumer;
+ }
+
+ // never fail
+ public boolean isCompletedExceptionally(){
+ return false;
+ }
+ };
+ Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
+ // do test: delay complete after execute 'isDone()' many times
+ // Why is the design so complicated, see: https://github.com/apache/pulsar/pull/15051
+ // reset channels( serverChannel, clientChannel )
+ resetChannel();
+ Whitebox.setInternalState(serverCnx, "consumers", consumers);
+ setChannelConnected();
+ // auth check disable
+ doReturn(false).when(brokerService).isAuthenticationEnabled();
+ doReturn(false).when(brokerService).isAuthorizationEnabled();
+ // do subscribe
+ ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
+ successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
+ "test" /* consumer name */, 0 /* avoid reseting cursor */);
+ channel.writeInbound(clientCommand);
+ Object responseObj = getResponse();
+ Predicate<Object> responseAssert = obj -> {
+ if (responseObj instanceof CommandSuccess) {
+ return true;
+ }
+ if (responseObj instanceof CommandError) {
+ CommandError commandError = (CommandError) responseObj;
+ return ServerError.ServiceNotReady == commandError.getError();
+ }
+ return false;
+ };
+ // assert no consumer-delete event occur
+ assertFalse(deleteTimesMark.getAllValues().contains(existingConsumerFuture));
+ // assert without another error occur
+ assertTrue(responseAssert.test(responseAssert));
+ // Server will not close the connection
+ assertTrue(channel.isOpen());
+ channel.finish();
+ }
+ // case3: exists existingConsumerFuture, already complete and exception
+ CompletableFuture existingConsumerFuture = Mockito.mock(CompletableFuture.class);
+ Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
+ // make consumerFuture delay finish
+ Mockito.when(existingConsumerFuture.isDone()).thenReturn(true);
+ // when sync get return, future will return success value.
+ Mockito.when(existingConsumerFuture.get()).thenThrow(new NullPointerException());
+ Mockito.when(existingConsumerFuture.get(Mockito.anyLong(), Mockito.any())).
+ thenThrow(new NullPointerException());
+ Mockito.when(existingConsumerFuture.isCompletedExceptionally()).thenReturn(true);
+ Mockito.when(existingConsumerFuture.getNow(Mockito.any())).thenThrow(new NullPointerException());
+ // reset channels( serverChannel, clientChannel )
+ resetChannel();
+ Whitebox.setInternalState(serverCnx, "consumers", consumers);
+ setChannelConnected();
+ // auth check disable
+ doReturn(false).when(brokerService).isAuthenticationEnabled();
+ doReturn(false).when(brokerService).isAuthorizationEnabled();
+ // do subscribe
+ ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
+ successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
+ "test" /* consumer name */, 0 /* avoid reseting cursor */);
+ channel.writeInbound(clientCommand);
+ Object responseObj = getResponse();
+ Predicate<Object> responseAssert = obj -> {
+ if (responseObj instanceof CommandError) {
+ CommandError commandError = (CommandError) responseObj;
+ return ServerError.ServiceNotReady != commandError.getError();
+ }
+ return false;
+ };
+ // assert error response
+ assertTrue(responseAssert.test(responseAssert));
+ // assert consumer-delete event occur
+ assertEquals(1L,
+ deleteTimesMark.getAllValues().stream().filter(f -> f == existingConsumerFuture).count());
+ // Server will not close the connection
+ assertTrue(channel.isOpen());
+ channel.finish();
+ }
}