You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/04/07 07:42:46 UTC

[GitHub] [pulsar] congbobo184 opened a new pull request, #15061: [fix][txn]: fix transaction producer stuck problem

congbobo184 opened a new pull request, #15061:
URL: https://github.com/apache/pulsar/pull/15061

   ### Motivation
   
   when transaction buffer recover fail, we don't remove the producer future from the producer map. If producer reconnect to this broker, it will throw 
   ```
   pulsar-io-5-1] WARN  org.apache.pulsar.broker.service.ServerCnx - [/10.124.4.69:49108][persistent://public/default/s_topic-partition-5] Producer with id is already present on the connection, producerId=45",2022-04-02T06:34:17.042900012Z
   ```
   and if client don't restart, we can't recover this producer the producer has been stucked
   ### Modifications
   
   when topic transaction buffer recvoer fail, remove the producer from the map.
   
   in master branch, https://github.com/apache/pulsar/pull/14467 has fixed this problem
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
   Check the box below and label this PR (if you have committer privilege).
   
   Need to update docs? 
     
   - [x] `no-need-doc` 
   bug fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 merged pull request #15061: [fix][txn]: fix transaction producer stuck problem

Posted by GitBox <gi...@apache.org>.
congbobo184 merged PR #15061:
URL: https://github.com/apache/pulsar/pull/15061


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #15061: [fix][txn]: fix transaction producer stuck problem

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #15061:
URL: https://github.com/apache/pulsar/pull/15061#discussion_r845061819


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java:
##########
@@ -569,4 +570,52 @@ public void testTransactionBufferNoSnapshotCloseReader() throws Exception{
         assertTrue(stats.getSubscriptions().keySet().contains("__compaction"));
     }
 
+    @Test
+    public void transactionBufferRecoverFailRemoveProducerFuture() throws Exception {
+        String topic = NAMESPACE1 + "/transactionBufferRecoverFailRemoveProducerFuture";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+
+        // txn buffer init success
+        Transaction txn = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build().get();
+        producer.newMessage(txn).value("test".getBytes()).sendAsync();
+        producer.newMessage(txn).value("test".getBytes()).sendAsync();
+        txn.commit().get();
+
+        PersistentTopic originalTopic = (PersistentTopic) getPulsarServiceList().get(0)
+                .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get();
+        TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) originalTopic.getTransactionBuffer();
+
+        CompletableFuture<Void> bufferFuture = new CompletableFuture<>();
+        bufferFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException("test"));
+
+        // set fail future to topic transaction buffer
+        Field field = TopicTransactionBuffer.class.getDeclaredField("transactionBufferFuture");
+        field.setAccessible(true);
+        field.set(topicTransactionBuffer, bufferFuture);

Review Comment:
   cool!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #15061: [fix][txn]: fix transaction producer stuck problem

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #15061:
URL: https://github.com/apache/pulsar/pull/15061#discussion_r844871548


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java:
##########
@@ -569,4 +570,52 @@ public void testTransactionBufferNoSnapshotCloseReader() throws Exception{
         assertTrue(stats.getSubscriptions().keySet().contains("__compaction"));
     }
 
+    @Test
+    public void transactionBufferRecoverFailRemoveProducerFuture() throws Exception {
+        String topic = NAMESPACE1 + "/transactionBufferRecoverFailRemoveProducerFuture";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+
+        // txn buffer init success
+        Transaction txn = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build().get();
+        producer.newMessage(txn).value("test".getBytes()).sendAsync();
+        producer.newMessage(txn).value("test".getBytes()).sendAsync();
+        txn.commit().get();
+
+        PersistentTopic originalTopic = (PersistentTopic) getPulsarServiceList().get(0)
+                .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get();
+        TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) originalTopic.getTransactionBuffer();
+
+        CompletableFuture<Void> bufferFuture = new CompletableFuture<>();
+        bufferFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException("test"));
+
+        // set fail future to topic transaction buffer
+        Field field = TopicTransactionBuffer.class.getDeclaredField("transactionBufferFuture");
+        field.setAccessible(true);
+        field.set(topicTransactionBuffer, bufferFuture);

Review Comment:
   Small tips, we can use `Whitebox.setInternalState(topicTransactionBuffer, "transactionBufferFuture", bufferFuture);` to simplify reflection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on pull request #15061: [fix][txn]: fix transaction producer stuck problem

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on PR #15061:
URL: https://github.com/apache/pulsar/pull/15061#issuecomment-1092361852

   @Jason918 have conflict 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org