You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/25 03:28:03 UTC
[GitHub] [pulsar] mattisonchao commented on pull request #11058: [ Issue 11050 ] Fixed flaky test ServerCnxTest.
mattisonchao commented on pull request #11058:
URL: https://github.com/apache/pulsar/pull/11058#issuecomment-868175558
@lhotari
### Background
This test ensures that consumers who send subscription requests concurrently will return ServerError. Please see below for details:
```java
ServerCnxTest 949:980
CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId,
consumerFuture);
if (existingConsumerFuture != null) {
if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
//... Omit some code
commandSender.sendSuccessResponse(requestId);
return null;
} else {
// There was an early request to create a consumer with same consumerId. This can happen
// when
// client timeout is lower the broker timeouts. We need to wait until the previous
// consumer
// creation request either complete or fails.
// ... Omit some code
if (!existingConsumerFuture.isDone()) {
error = ServerError.ServiceNotReady;
} else {
error = getErrorCode(existingConsumerFuture);
consumers.remove(consumerId, existingConsumerFuture);
}
commandSender.sendErrorResponse(requestId, error,
"Consumer is already present on the connection");
return null;
}
```
### Cause
The first subscription request has been sent, but the next subscription request is delayed (may be due to network delays or other reasons.). So the first request may continue to run until it completes.
```java
ServerCnxTest 1020:1041
service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
.thenCompose(optTopic -> {
// .... Omit some code
})
.thenAccept(consumer -> {
if (consumerFuture.complete(consumer)) { // the consumer complete
log.info("[{}] Created subscription on topic {} / {}",
remoteAddress, topicName, subscriptionName);
commandSender.sendSuccessResponse(requestId);
} else {
// The consumer future was completed before by a close command
try {
consumer.close();
log.info("[{}] Cleared consumer created after timeout on client side {}",
remoteAddress, consumer);
} catch (BrokerServiceException e) {
log.warn(
"[{}] Error closing consumer created"
+ " after timeout on client side {}: {}",
remoteAddress, consumer, e.getMessage());
}
consumers.remove(consumerId, consumerFuture);
}
})
```
Therefore, when the second request is received, it will be judged as another situation.
```java
ServerCnxTest 953:961
if (existingConsumerFuture != null) {
if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
Consumer consumer = existingConsumerFuture.getNow(null);
log.info("[{}] Consumer with the same id is already created:"
+ " consumerId={}, consumer={}",
remoteAddress, consumerId, consumer);
commandSender.sendSuccessResponse(requestId);
return null;
} else {
// ...Omit some code
}
}
```
And then, the test will fail. due to Success response.
Solution
Reducing the time gap between sending requests can optimize the test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org