You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/18 10:28:13 UTC
[GitHub] [pulsar] liangyepianzhou opened a new issue, #16124: [Bug][Client] NullPointerException for doTransactionAcknowledgeForResponse
liangyepianzhou opened a new issue, #16124:
URL: https://github.com/apache/pulsar/issues/16124
**Describe the bug**
When consumer reconnects to broker due to channelInactive, the cnx will be null, but doTransactionAcknowledgeForResponse will call `cnx().newAckForReceipt(cmd, requested);` and then NullPointerException will be reported.
* connectionClosed
``` java
if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
...
state.setState(State.Connecting);
....
}
```
* doAcknowledge
```java
if (getState() != State.Ready && getState() != State.Connecting) {
...
}
if (txn != null) {
return doTransactionAcknowledgeForResponse(messageId, ackType, null, properties,
new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits()));
}
return acknowledgmentsGroupingTracker.addAcknowledgment((MessageIdImpl) messageId, ackType, properties);
```
* doTransactionAcknowledgeForResponse
```java
...
return cnx().newAckForReceipt(cmd, requested);
```
**To Reproduce**
Steps to reproduce the behavior:
1. new a consumer
2. restart broker
3. use the consumer to ack message with a transaction.
4. See error
Or you can use this test to reproduce the error.
```java
public void testName() throws Exception {
String topic = NAMESPACE1 + "/test1";
@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer(Schema.BYTES)
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient
.newConsumer()
.topic(topic)
.subscriptionName("sub")
.subscribe();
consumer.getSubscription();
for (int i = 0; i < 10; i++) {
producer.newMessage().value(Bytes.toBytes(i)).send();
}
Method method = ConsumerImpl.class.getDeclaredMethod("cnx");
method.setAccessible(true);
ClientCnx cnx = (ClientCnx) method.invoke(consumer);
Method method1 = ConsumerImpl.class.getDeclaredMethod("connectionClosed", ClientCnx.class);
method1.setAccessible(true);
method1.invoke(consumer, cnx);
for (int i = 0; i <10 ; i++) {
Message<byte[]> message = consumer.receive();
Transaction transaction = pulsarClient
.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build().get();
consumer.acknowledgeAsync(message.getMessageId(), transaction);
transaction.commit().get();
}
}
```
**Screenshots**
The calling logic can be judged according to the log.
1. channelInactive
2. connectionClosed
3. doAcknowledge
4. doTransactionAcknowledgeForResponse
<img width="1788" alt="image" src="https://user-images.githubusercontent.com/55571188/174433454-36e792ba-8fb5-4559-a3fe-9a4c7246745f.png">
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] congbobo184 closed issue #16124: [Bug][Client] NullPointerException for doTransactionAcknowledgeForResponse
Posted by GitBox <gi...@apache.org>.
congbobo184 closed issue #16124: [Bug][Client] NullPointerException for doTransactionAcknowledgeForResponse
URL: https://github.com/apache/pulsar/issues/16124
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org