You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/25 03:28:03 UTC

[GitHub] [pulsar] mattisonchao commented on pull request #11058: [ Issue 11050 ] Fixed flaky test ServerCnxTest.

mattisonchao commented on pull request #11058:
URL: https://github.com/apache/pulsar/pull/11058#issuecomment-868175558


   @lhotari
   ### Background
   
   This test ensures that consumers who send subscription requests concurrently will return ServerError. Please see below for details:
   ```java
   ServerCnxTest  949:980
   
                           CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
                           CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId,
                                   consumerFuture);
                           if (existingConsumerFuture != null) {
                               if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
                                   //... Omit some code
                                   commandSender.sendSuccessResponse(requestId);
                                   return null;
                               } else {
                                   // There was an early request to create a consumer with same consumerId. This can happen
                                   // when
                                   // client timeout is lower the broker timeouts. We need to wait until the previous
                                   // consumer
                                   // creation request either complete or fails.
                                   // ... Omit some code
                                   if (!existingConsumerFuture.isDone()) {
                                       error = ServerError.ServiceNotReady;
                                   } else {
                                       error = getErrorCode(existingConsumerFuture);
                                       consumers.remove(consumerId, existingConsumerFuture);
                                   }
                                   commandSender.sendErrorResponse(requestId, error,
                                           "Consumer is already present on the connection");
                                   return null;
                               }                 
   ```
   ### Cause
   
   The first subscription request has been sent, but the next subscription request is delayed (may be due to network delays or other reasons.). So the first request may continue to run until it completes.
   ```java
   ServerCnxTest  1020:1041
   service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
                                   .thenCompose(optTopic -> {
                                     // .... Omit some code
                                   })
    .thenAccept(consumer -> {
                                       if (consumerFuture.complete(consumer)) {     // the consumer complete
                                           log.info("[{}] Created subscription on topic {} / {}",
                                                   remoteAddress, topicName, subscriptionName);
                                           commandSender.sendSuccessResponse(requestId);
                                       } else {
                                           // The consumer future was completed before by a close command
                                           try {
                                               consumer.close();
                                               log.info("[{}] Cleared consumer created after timeout on client side {}",
                                                       remoteAddress, consumer);
                                           } catch (BrokerServiceException e) {
                                               log.warn(
                                                       "[{}] Error closing consumer created"
                                                               + " after timeout on client side {}: {}",
                                                       remoteAddress, consumer, e.getMessage());
                                           }
                                           consumers.remove(consumerId, consumerFuture);
                                       }
                                   })
   
   ```
   Therefore, when the second request is received, it will be judged as another situation.
   
   ```java
   ServerCnxTest  953:961
   
                    if (existingConsumerFuture != null) {
                               if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
                                   Consumer consumer = existingConsumerFuture.getNow(null);
                                   log.info("[{}] Consumer with the same id is already created:"
                                            + " consumerId={}, consumer={}",
                                            remoteAddress, consumerId, consumer);
                                   commandSender.sendSuccessResponse(requestId);
                                   return null;
                               } else {
                     //  ...Omit some code
                                }
                     }
   ```
   And then, the test will fail. due to Success response.
   
   Solution
   
   Reducing the time gap between sending requests can optimize the test.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org