You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/09/25 11:16:44 UTC
[pulsar] branch master updated: [tests] Issue 2639:
NullPointerException at closing consumer (#2640)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a4d1624 [tests] Issue 2639: NullPointerException at closing consumer (#2640)
a4d1624 is described below
commit a4d162448d63b4eea8edcdde0797dce877bba2f2
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Tue Sep 25 04:16:39 2018 -0700
[tests] Issue 2639: NullPointerException at closing consumer (#2640)
* [tests] Issue 2639: NullPointerException at closing consumer
*Motivation*
Connection can be null when closing consumer.
*Changes*
Check if the connection is null or not during closing.
* Address comments
---
.../apache/pulsar/client/impl/ConsumerImpl.java | 44 +++++++++++++---------
1 file changed, 26 insertions(+), 18 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index f1db613..826d977 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -689,32 +689,40 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
acknowledgmentsGroupingTracker.close();
long requestId = client.newRequestId();
- ByteBuf cmd = Commands.newCloseConsumer(consumerId, requestId);
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
ClientCnx cnx = cnx();
- cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> {
- cnx.removeConsumer(consumerId);
- if (exception == null || !cnx.ctx().channel().isActive()) {
- log.info("[{}] [{}] Closed consumer", topic, subscription);
- setState(State.Closed);
- unAckedMessageTracker.close();
- if (possibleSendToDeadLetterTopicMessages != null) {
- possibleSendToDeadLetterTopicMessages.clear();
+ if (null == cnx) {
+ cleanupAtClose(closeFuture);
+ } else {
+ ByteBuf cmd = Commands.newCloseConsumer(consumerId, requestId);
+ cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> {
+ cnx.removeConsumer(consumerId);
+ if (exception == null || !cnx.ctx().channel().isActive()) {
+ cleanupAtClose(closeFuture);
+ } else {
+ closeFuture.completeExceptionally(exception);
}
- closeFuture.complete(null);
- client.cleanupConsumer(this);
- // fail all pending-receive futures to notify application
- failPendingReceive();
- } else {
- closeFuture.completeExceptionally(exception);
- }
- return null;
- });
+ return null;
+ });
+ }
return closeFuture;
}
+ private void cleanupAtClose(CompletableFuture<Void> closeFuture) {
+ log.info("[{}] [{}] Closed consumer", topic, subscription);
+ setState(State.Closed);
+ unAckedMessageTracker.close();
+ if (possibleSendToDeadLetterTopicMessages != null) {
+ possibleSendToDeadLetterTopicMessages.clear();
+ }
+ closeFuture.complete(null);
+ client.cleanupConsumer(this);
+ // fail all pending-receive futures to notify application
+ failPendingReceive();
+ }
+
private void failPendingReceive() {
lock.readLock().lock();
try {