You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/20 08:28:13 UTC

[GitHub] [pulsar] liangyepianzhou opened a new pull request, #16142: [fix][txn] Fix NPE when ack message with transaction at cnx = null

liangyepianzhou opened a new pull request, #16142:
URL: https://github.com/apache/pulsar/pull/16142

   ## Motivation
   When a channel is inactive, connectHandler will set the cnx = null and reconnect.
   At this time, consumers use transaction to ack messages will report NPE.
   ## Modification
   Return exception when cnx = null.
   
   **Why not use a queue to store operations?**
   1. If we use a queue to store op, we need to take care of the timeout of the op. And the lock is required.
   2. If the connection time is long or there is a BUG client that has not been connected, the client will crash. 
    
   
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [x] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 merged pull request #16142: [fix][txn] Fix NPE when ack message with transaction at cnx = null

Posted by GitBox <gi...@apache.org>.
congbobo184 merged PR #16142:
URL: https://github.com/apache/pulsar/pull/16142


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #16142: [fix][txn] Fix NPE when ack message with transaction at cnx = null

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #16142:
URL: https://github.com/apache/pulsar/pull/16142#discussion_r901523556


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java:
##########
@@ -997,6 +1000,48 @@ public void testConsistencyOfTransactionStatsAtEndTxn() throws Exception {
         transaction.commit().get();
     }
 
+    @Test
+    public void testGetConnectExceptionForAckMsgWhenCnxIsNull() throws Exception {
+        String topic = NAMESPACE1 + "/testGetConnectExceptionForAckMsgWhenCnxIsNull";
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient
+                .newProducer(Schema.BYTES)
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient
+                .newConsumer()
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage().value(Bytes.toBytes(i)).send();
+        }
+        Method method = ConsumerImpl.class.getDeclaredMethod("cnx");
+        method.setAccessible(true);
+        ClientCnx cnx = (ClientCnx) method.invoke(consumer);
+        Method method1 = ConsumerImpl.class.getDeclaredMethod("connectionClosed", ClientCnx.class);
+        method1.setAccessible(true);
+        method1.invoke(consumer, cnx);

Review Comment:
   ```suggestion
           ClientCnx cnx = Whitebox.invokeMethod(consumer, "cnx");
           Whitebox.invokeMethod(consumer, "connectionClosed", cnx);
   ```
   We can use `Whitebox` to simplify the reflect operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #16142: [fix][txn] Fix NPE when ack message with transaction at cnx = null

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on code in PR #16142:
URL: https://github.com/apache/pulsar/pull/16142#discussion_r901528330


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2679,7 +2679,14 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId me
         } else {
             unAckedMessageTracker.remove(messageId);
         }
-        return cnx().newAckForReceipt(cmd, requestId);
+        ClientCnx cnx = cnx();
+        if (cnx == null) {
+            return FutureUtil.failedFuture(new PulsarClientException
+                    .ConnectException("Failed to ack message [" + messageId + "] "
+                    + "for transaction [" + txnID + "] due to cnx = null"));

Review Comment:
   ```suggestion
                       + "for transaction [" + txnID + "] due to consumer connect fail, consumer state: " + getState()));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org