You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/11/01 08:28:47 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #12521: [Transaction]Txn client check timeout

codelipenghui commented on a change in pull request #12521:
URL: https://github.com/apache/pulsar/pull/12521#discussion_r740033610



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
##########
@@ -960,4 +960,36 @@ public void oneTransactionOneTopicWithMultiSubTest() throws Exception {
         }
         assertTrue(flag);
     }
+
+    @Test
+    public void testTxnTimeOutInClient() throws Exception{
+        String topic = NAMESPACE1 + "/testTxnTimeOutInClient";
+        Producer producer = pulsarClient.newProducer(Schema.STRING).producerName("testTxnTimeOut_producer")
+                .topic(topic).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
+        Consumer consumer = pulsarClient.newConsumer(Schema.STRING).consumerName("testTxnTimeOut_consumer")
+                .topic(topic).subscriptionName("testTxnTimeOut_sub").subscribe();
+
+        Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
+                .build().get();
+        producer.newMessage().send();
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(((TransactionImpl)transaction).getState(), TransactionImpl.State.ABORTED);
+        });
+
+        try {
+            producer.newMessage(transaction).send();
+            Assert.fail();
+        } catch (Exception e) {
+            Assert.assertTrue(e.getCause().getCause() instanceof TransactionCoordinatorClientException

Review comment:
       This will confuse users because they did not abort the transaction, we should using a TransactionTimeoutException.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -490,6 +491,15 @@ public void reconsumeLaterCumulative(Message<?> message, long delayTime, TimeUni
         if (null != txn) {
             checkArgument(txn instanceof TransactionImpl);
             txnImpl = (TransactionImpl) txn;
+           if (txnImpl.getState() != TransactionImpl.State.OPEN) {
+               CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+               completableFuture
+                       .completeExceptionally(new TransactionCoordinatorClientException
+                               .InvalidTxnStatusException("["+ txn.getTxnID().toString() +"] with unexpected state : "

Review comment:
       It's better to create the template error message for InvalidTxnStatusException, looks like using a String formater, so that you can reuse the formater.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org