You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/04/06 16:49:36 UTC

[GitHub] [pulsar] poorbarcode opened a new pull request, #15051: [fix] [broker] make ServerCnx.handleSubscribe thread safety

poorbarcode opened a new pull request, #15051:
URL: https://github.com/apache/pulsar/pull/15051

   Fixes #15050
   
   Master Issue: #15050
   
   ### Motivation
   see #14970
   see #15050
   
   ### Modifications
   
   Modify the process execution sequence to improve stability
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   ### Documentation
   
   - [ ] `doc-required` 
   - [x] `no-need-doc` 
   - [ ] `doc` 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1196250804

   > @poorbarcode Could you migrate this PR to branch-2.8? It looks like the Mockito version of branch-2.8 doesn't support the creation of static mocks.
   
   ok


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode closed pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode closed pull request #15051: [fix][broker] Fix potential to add duplicated consumer
URL: https://github.com/apache/pulsar/pull/15051


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#discussion_r846947659


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -992,32 +992,29 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
                 }
 
                 if (existingConsumerFuture != null) {
-                    if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
-                        Consumer consumer = existingConsumerFuture.getNow(null);
-                        log.info("[{}] Consumer with the same id is already created:"
-                                 + " consumerId={}, consumer={}",
-                                 remoteAddress, consumerId, consumer);
-                        commandSender.sendSuccessResponse(requestId);
-                        return null;
-                    } else {
+                    if (!existingConsumerFuture.isDone()){
                         // There was an early request to create a consumer with same consumerId. This can happen
                         // when
                         // client timeout is lower the broker timeouts. We need to wait until the previous
                         // consumer
                         // creation request either complete or fails.
                         log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
-                                 + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
-                        ServerError error = null;
-                        if (!existingConsumerFuture.isDone()) {
-                            error = ServerError.ServiceNotReady;
-                        } else {
-                            error = getErrorCode(existingConsumerFuture);
-                            consumers.remove(consumerId, existingConsumerFuture);
-                        }
-                        commandSender.sendErrorResponse(requestId, error,
+                                + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
+                        commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
                                 "Consumer is already present on the connection");
-                        return null;
+                    } else if (existingConsumerFuture.isCompletedExceptionally()){
+                        ServerError error = getErrorCode(existingConsumerFuture);

Review Comment:
   Better to keep the warning log here too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1128860686

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
lhotari commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1092496551

   > It's just a guess at one of the possibilities for "Why #14970 problem happend".
   > 
   > Because there has only one instance in `consumerSet`, so two consumer instance has same `address` & `consumerId`, so there are only two possibilities:
   > 
   > * Consumer restart use same SocketAddress.
   > * One consumer instance execute command-subscribe more times in a short time.
   > 
   > After code-review on the second possibility, finds possibility-case and fixed it.
   > 
   > Maybe there are other problems that can reproduce #14970 problem
   
   I don't think that this PR fixes that issue. ServerCnx is tied to a single TCP/IP connection and even without the changes in this PR, duplicates on the same connection are prevented. 
   
   As mentioned before, this PR is useful since it fixes a race condition and returns ServiceNotReady in the case that the future completes after the check has been made (as explained in https://github.com/apache/pulsar/pull/15051#issuecomment-1092431543). 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1092500743

   > I don't think that this PR fixes that issue. ServerCnx is tied to a single TCP/IP connection and even without the changes in this PR, duplicates on the same connection are prevented.
   
   @lhotari 
   Maybe you ignored the async-execute-code in `ServerCnx.handleSubscribe`. if without async-execute, you are right.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1092436056

   > > > Seems that this PR only enhances the exception handling. Could you provide more information on how this PR fixed the duplicated consumer issue?
   > > 
   > > 
   > > The key point for the problem was "possibly delete a running CompletableFuture", the logic of the original code was not rigorous enough. this PR solved it.
   > 
   > I was wondering about the same thing whether this is just a refactoring. This does fix a race condition in providing the error message. It's possible the the CompletableFuture completes after it has been checked. In that case, the old code might have provided a wrong error code.
   > 
   > @poorbarcode I don't think that this PR prevents duplicated consumers. Could you explain how it achieves that?
   
   @lhotari 
   
   This PR fixed that : When things go wrong with consumer operations, duplicated consumers registry.
   
   You can reproduce the problem like this (The code is in the attachment): 
   
   - Override  ConsumerImpl.java by acttachment file. These change will Execute command-Subscribe more times in a short time
   - Overide ServerCnx.java by acttachment file. These change controls the execution order of multiple threads, increasing the probability of problems to 100%
   - Add SubscribeProcessController.java. It is multi thread execution-order-controller
   
   [ConsumerImpl.java.txt](https://github.com/apache/pulsar/files/8448567/ConsumerImpl.java.txt)
   [ServerCnx.java.txt](https://github.com/apache/pulsar/files/8448568/ServerCnx.java.txt)
   [SubscribeProcessController.java.txt](https://github.com/apache/pulsar/files/8448569/SubscribeProcessController.java.txt)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #15051: [fix] [broker] Fix potential risk: Command-Subscribe add duplicated consumer

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#discussion_r844808216


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -992,32 +992,24 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
                 }
 
                 if (existingConsumerFuture != null) {
-                    if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
+                    if (!existingConsumerFuture.isDone()){
+                        log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
+                                + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
+                        commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
+                                "Consumer is already present on the connection");
+                    } else if (existingConsumerFuture.isCompletedExceptionally()){
+                        ServerError error = getErrorCode(existingConsumerFuture);
+                        consumers.remove(consumerId, existingConsumerFuture);
+                        commandSender.sendErrorResponse(requestId, error,
+                                "Last subscribe failure, please try again.");
+                    } else {
                         Consumer consumer = existingConsumerFuture.getNow(null);
                         log.info("[{}] Consumer with the same id is already created:"
-                                 + " consumerId={}, consumer={}",
-                                 remoteAddress, consumerId, consumer);
+                                        + " consumerId={}, consumer={}",
+                                remoteAddress, consumerId, consumer);
                         commandSender.sendSuccessResponse(requestId);
-                        return null;
-                    } else {
-                        // There was an early request to create a consumer with same consumerId. This can happen

Review Comment:
   Why do we delete these comments?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1094296426

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1092387849

   > Seems that this PR only enhances the exception handling. Could you provide more information on how this PR fixed the duplicated consumer issue?
   
   The key point for the problem was "possibly delete a running CompletableFuture", the logic of the original code was not rigorous enough.  this PR solved this.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #15051: [fix] [broker] Fix potential risk: Command-Subscribe add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#discussion_r844608754


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2737,6 +2729,7 @@ public void cancelPublishBufferLimiting() {
             if (e.getCause() instanceof BrokerServiceException) {
                 error = BrokerServiceException.getClientErrorCode(e.getCause());
             }
+            log.error("some thing error.", e);

Review Comment:
   That is a common method for "get error code from CompetableFuture", could not append more clear description



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#discussion_r851070380


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -992,32 +992,29 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
                 }
 
                 if (existingConsumerFuture != null) {
-                    if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
-                        Consumer consumer = existingConsumerFuture.getNow(null);
-                        log.info("[{}] Consumer with the same id is already created:"
-                                 + " consumerId={}, consumer={}",
-                                 remoteAddress, consumerId, consumer);
-                        commandSender.sendSuccessResponse(requestId);
-                        return null;
-                    } else {
+                    if (!existingConsumerFuture.isDone()){
                         // There was an early request to create a consumer with same consumerId. This can happen
                         // when
                         // client timeout is lower the broker timeouts. We need to wait until the previous
                         // consumer
                         // creation request either complete or fails.
                         log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
-                                 + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
-                        ServerError error = null;
-                        if (!existingConsumerFuture.isDone()) {
-                            error = ServerError.ServiceNotReady;
-                        } else {
-                            error = getErrorCode(existingConsumerFuture);
-                            consumers.remove(consumerId, existingConsumerFuture);
-                        }
-                        commandSender.sendErrorResponse(requestId, error,
+                                + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
+                        commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
                                 "Consumer is already present on the connection");
-                        return null;
+                    } else if (existingConsumerFuture.isCompletedExceptionally()){
+                        ServerError error = getErrorCode(existingConsumerFuture);

Review Comment:
   @Jason918  Fixed. could you take a look



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1112863405

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#discussion_r845727990


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -992,32 +992,29 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
                 }
 
                 if (existingConsumerFuture != null) {
-                    if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
-                        Consumer consumer = existingConsumerFuture.getNow(null);
-                        log.info("[{}] Consumer with the same id is already created:"
-                                 + " consumerId={}, consumer={}",
-                                 remoteAddress, consumerId, consumer);
-                        commandSender.sendSuccessResponse(requestId);
-                        return null;
-                    } else {
+                    if (!existingConsumerFuture.isDone()){
                         // There was an early request to create a consumer with same consumerId. This can happen
                         // when
                         // client timeout is lower the broker timeouts. We need to wait until the previous
                         // consumer
                         // creation request either complete or fails.
                         log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
-                                 + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
-                        ServerError error = null;
-                        if (!existingConsumerFuture.isDone()) {
-                            error = ServerError.ServiceNotReady;
-                        } else {
-                            error = getErrorCode(existingConsumerFuture);
-                            consumers.remove(consumerId, existingConsumerFuture);
-                        }
-                        commandSender.sendErrorResponse(requestId, error,
+                                + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
+                        commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
                                 "Consumer is already present on the connection");
-                        return null;
+                    } else if (existingConsumerFuture.isCompletedExceptionally()){
+                        ServerError error = getErrorCode(existingConsumerFuture);
+                        consumers.remove(consumerId, existingConsumerFuture);
+                        commandSender.sendErrorResponse(requestId, error,
+                                "Last subscribe failure, please try again.");

Review Comment:
   @lhotari  fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1103016444

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1102150885

   /pulsarbot run-failure-checks
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1128893642

   @codelipenghui Could you take a look ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#discussion_r845125480


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -992,32 +992,24 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
                 }
 
                 if (existingConsumerFuture != null) {
-                    if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
+                    if (!existingConsumerFuture.isDone()){
+                        log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
+                                + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
+                        commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
+                                "Consumer is already present on the connection");
+                    } else if (existingConsumerFuture.isCompletedExceptionally()){
+                        ServerError error = getErrorCode(existingConsumerFuture);
+                        consumers.remove(consumerId, existingConsumerFuture);
+                        commandSender.sendErrorResponse(requestId, error,
+                                "Last subscribe failure, please try again.");
+                    } else {
                         Consumer consumer = existingConsumerFuture.getNow(null);
                         log.info("[{}] Consumer with the same id is already created:"
-                                 + " consumerId={}, consumer={}",
-                                 remoteAddress, consumerId, consumer);
+                                        + " consumerId={}, consumer={}",
+                                remoteAddress, consumerId, consumer);
                         commandSender.sendSuccessResponse(requestId);
-                        return null;
-                    } else {
-                        // There was an early request to create a consumer with same consumerId. This can happen

Review Comment:
   Already reduction



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#discussion_r845124757


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2737,6 +2729,7 @@ public void cancelPublishBufferLimiting() {
             if (e.getCause() instanceof BrokerServiceException) {
                 error = BrokerServiceException.getClientErrorCode(e.getCause());
             }
+            log.error("some thing error.", e);

Review Comment:
   already deleted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1092498710

   > > It's just a guess at one of the possibilities for "Why #14970 problem happend".
   > > Because there has only one instance in , so two consumer instance has same  & , so there are only two possibilities:`consumerSet``address``consumerId`
   > > 
   > > * Consumer restart use same SocketAddress.
   > > * One consumer instance execute command-subscribe more times in a short time.
   > > 
   > > After code-review on the second possibility, finds possibility-case and fixed it.
   > > Maybe there are other problems that can reproduce #14970 problem
   > 
   > I don't think that this PR fixes that issue. ServerCnx is tied to a single TCP/IP connection and even without the changes in this PR, duplicates on the same connection are prevented.
   > 
   > As mentioned before, this PR is useful since it fixes a race condition and returns ServiceNotReady in the case that the future completes after the check has been made (as explained in [#15051 (comment)](https://github.com/apache/pulsar/pull/15051#issuecomment-1092431543)).
   
   
   
   > > poorbarcode reopened this 1 hour ago
   > 
   > > /pulsarbot run-failure-checks
   > 
   > Please be aware of this: [apache/pulsar-test-infra#28 (comment)](https://github.com/apache/pulsar-test-infra/pull/28#issuecomment-1092079756)
   > 
   > There shouldn't be a need to close & re-open the PR to rerun failed jobs. Please read the above explanations to understand what some failures are about. They don't need to clear off to be able to merge the PR.
   
   My point is wrong
   
   > > poorbarcode reopened this 1 hour ago
   > 
   > > /pulsarbot run-failure-checks
   > 
   > Please be aware of this: [apache/pulsar-test-infra#28 (comment)](https://github.com/apache/pulsar-test-infra/pull/28#issuecomment-1092079756)
   > 
   > There shouldn't be a need to close & re-open the PR to rerun failed jobs. Please read the above explanations to understand what some failures are about. They don't need to clear off to be able to merge the PR.
   
   I point wrong 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1092478018

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1196232576

   @poorbarcode Could you migrate this PR to branch-2.8? It looks like the Mockito version of branch-2.8 doesn't support the creation of static mocks.
   
   ```
   org.mockito.exceptions.base.MockitoException: 
   The used MockMaker PowerMockMaker does not support the creation of static mocks
   
   Mockito's inline mock maker supports static mocks based on the Instrumentation API.
   You can simply enable this mock mode, by placing the 'mockito-inline' artifact where you are currently using 'mockito-core'.
   Note that Mockito's inline mock maker is not supported on Android.
   
   	at org.apache.pulsar.broker.service.ServerCnxTest.testNeverDelayConsumerFutureWhenNotFail(ServerCnxTest.java:1921)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1128566364

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1092471076

   > >
   
   
   
   > > > @poorbarcode I don't think that this PR prevents duplicated consumers. Could you explain how it achieves that?
   > > 
   > > 
   > > @lhotari @RobertIndie
   > > This PR fixed that : When things go wrong with consumer operations, duplicated consumers registry.
   > > You can reproduce the problem like this (The code is in the attachment):
   > 
   > I'm sorry I didn't explain clearly what I meant. Please explain how this PR prevents adding duplicate consumers. I don't see a chance in behavior for that particular detail. I do acknowledge that this PR makes sense to fix a race where the consumer gets removed when it shouldn't. (I explained that in my previous comment). However, I don't see how this prevents adding of duplicate consumers which appears in #14970.
   
   It's just a guess at one of the possibilities for "Why #14970 problem happend".  
   
   Because there has only one instance in `consumerSet`, so two consumer instance has same `address` & `consumerId`, so there are only two possibilities: 
   
   - Consumer restart use same SocketAddress.
   - One consumer instance execute command-subscribe more times in a short time.
   
   After code-review on the second possibility,  finds possibility-case and fixed it. 
   
   Maybe there are other problems that can reproduce #14970 problem


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
lhotari commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1092461922

   > > @poorbarcode I don't think that this PR prevents duplicated consumers. Could you explain how it achieves that?
   > 
   > @lhotari @RobertIndie
   > 
   > This PR fixed that : When things go wrong with consumer operations, duplicated consumers registry.
   > 
   > You can reproduce the problem like this (The code is in the attachment):
   
   I'm sorry I didn't explain clearly what I meant. Please explain how this PR prevents adding duplicate consumers. I don't see a chance in behavior for that particular detail. I do acknowledge that this PR makes sense to fix a race where the consumer gets removed when it shouldn't. (I explained that in my previous comment). However, I don't see how this prevents adding of duplicate consumers which appears in #14970.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1129477025

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 merged pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
congbobo184 merged PR #15051:
URL: https://github.com/apache/pulsar/pull/15051


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1100046204

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #15051: [fix] [broker] Fix potential risk: Command-Subscribe add duplicated consumer

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#discussion_r844799277


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2737,6 +2729,7 @@ public void cancelPublishBufferLimiting() {
             if (e.getCause() instanceof BrokerServiceException) {
                 error = BrokerServiceException.getClientErrorCode(e.getCause());
             }
+            log.error("some thing error.", e);

Review Comment:
   Why do we need this log here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1094296347

   > LGTM. Great work! Please add a unit test to prevent regression.
   
   finished


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1100045183

   > LGTM
   
   
   
   > LGTM
   
   Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1127173816

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1128861697

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
lhotari commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1092498038

   > poorbarcode reopened this 1 hour ago
   
   > /pulsarbot run-failure-checks
   
   Please be aware of this: https://github.com/apache/pulsar-test-infra/pull/28#issuecomment-1092079756
   
   There shouldn't be a need to close & re-open the PR to rerun failed jobs. Please read the above explanations to understand what some failures are about. They don't need to clear off to be able to merge the PR. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on a diff in pull request #15051: [fix] [broker] Fix ServerCnx.handleSubscribe add potentially duplicated consumer issue

Posted by GitBox <gi...@apache.org>.
HQebupt commented on code in PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#discussion_r844598615


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2737,6 +2729,7 @@ public void cancelPublishBufferLimiting() {
             if (e.getCause() instanceof BrokerServiceException) {
                 error = BrokerServiceException.getClientErrorCode(e.getCause());
             }
+            log.error("some thing error.", e);

Review Comment:
   Please use more meaningful error log.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1092395737

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#discussion_r845726319


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -992,32 +992,29 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
                 }
 
                 if (existingConsumerFuture != null) {
-                    if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
-                        Consumer consumer = existingConsumerFuture.getNow(null);
-                        log.info("[{}] Consumer with the same id is already created:"
-                                 + " consumerId={}, consumer={}",
-                                 remoteAddress, consumerId, consumer);
-                        commandSender.sendSuccessResponse(requestId);
-                        return null;
-                    } else {
+                    if (!existingConsumerFuture.isDone()){
                         // There was an early request to create a consumer with same consumerId. This can happen
                         // when
                         // client timeout is lower the broker timeouts. We need to wait until the previous
                         // consumer
                         // creation request either complete or fails.
                         log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
-                                 + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
-                        ServerError error = null;
-                        if (!existingConsumerFuture.isDone()) {
-                            error = ServerError.ServiceNotReady;
-                        } else {
-                            error = getErrorCode(existingConsumerFuture);
-                            consumers.remove(consumerId, existingConsumerFuture);
-                        }
-                        commandSender.sendErrorResponse(requestId, error,
+                                + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
+                        commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
                                 "Consumer is already present on the connection");
-                        return null;
+                    } else if (existingConsumerFuture.isCompletedExceptionally()){
+                        ServerError error = getErrorCode(existingConsumerFuture);
+                        consumers.remove(consumerId, existingConsumerFuture);
+                        commandSender.sendErrorResponse(requestId, error,
+                                "Last subscribe failure, please try again.");

Review Comment:
   I don't think there needs to be a "please try again" in the technical error message. :) 
   `"Consumer that failed is alread present on the connection"` would be more aligned with the other error messages.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -992,32 +992,29 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
                 }
 
                 if (existingConsumerFuture != null) {
-                    if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
-                        Consumer consumer = existingConsumerFuture.getNow(null);
-                        log.info("[{}] Consumer with the same id is already created:"
-                                 + " consumerId={}, consumer={}",
-                                 remoteAddress, consumerId, consumer);
-                        commandSender.sendSuccessResponse(requestId);
-                        return null;
-                    } else {
+                    if (!existingConsumerFuture.isDone()){
                         // There was an early request to create a consumer with same consumerId. This can happen
                         // when
                         // client timeout is lower the broker timeouts. We need to wait until the previous
                         // consumer
                         // creation request either complete or fails.
                         log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
-                                 + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
-                        ServerError error = null;
-                        if (!existingConsumerFuture.isDone()) {
-                            error = ServerError.ServiceNotReady;
-                        } else {
-                            error = getErrorCode(existingConsumerFuture);
-                            consumers.remove(consumerId, existingConsumerFuture);
-                        }
-                        commandSender.sendErrorResponse(requestId, error,
+                                + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
+                        commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
                                 "Consumer is already present on the connection");
-                        return null;
+                    } else if (existingConsumerFuture.isCompletedExceptionally()){
+                        ServerError error = getErrorCode(existingConsumerFuture);
+                        consumers.remove(consumerId, existingConsumerFuture);
+                        commandSender.sendErrorResponse(requestId, error,
+                                "Last subscribe failure, please try again.");

Review Comment:
   I don't think there needs to be a "please try again" in the technical error message. :) 
   `"Consumer that failed is already present on the connection"` would be more aligned with the other error messages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
lhotari commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1092431543

   > > Seems that this PR only enhances the exception handling. Could you provide more information on how this PR fixed the duplicated consumer issue?
   > 
   > The key point for the problem was "possibly delete a running CompletableFuture", the logic of the original code was not rigorous enough. this PR solved it.
   
   I was wondering about the same thing whether this is just a refactoring. This does fix a race condition in providing the error message. It's possible the the CompletableFuture completes after it has been checked. In that case, the old code might have provided a wrong error message.
   
   @poorbarcode  I don't think that this PR prevents duplicated consumers. Could you explain how it achieves that?
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1092462956

   > 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1094296299

   > good catch! cloud you please add a test for test, prevent it from being changed back later
   
   finished.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15051: [fix][broker] Fix potential to add duplicated consumer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #15051:
URL: https://github.com/apache/pulsar/pull/15051#issuecomment-1128382108

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org