You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/21 01:49:40 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #14760: [fix][txn]: fix transaction buffer snaphost broker persistent close issue

codelipenghui commented on a change in pull request #14760:
URL: https://github.com/apache/pulsar/pull/14760#discussion_r830711171



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
##########
@@ -502,6 +502,32 @@ public void testTransactionBufferRecoverThrowBrokerMetadataException() throws Ex
 
         producer.newMessage(txn).value("test".getBytes()).sendAsync();
         txn.commit().get();
+
+        PersistentTopic newTopic = (PersistentTopic) getPulsarServiceList().get(0)
+                .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get();
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        completableFuture.completeExceptionally(new PulsarClientException.BrokerPersistenceException("test"));
+        doReturn(completableFuture)
+                .when(transactionBufferSnapshotService).createReader(any());

Review comment:
       Interesting, why create reader will get BrokerPersistenceException, it will not publish messages to the topic.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -178,8 +178,9 @@ public void handleTxnEntry(Entry entry) {
                     }
 
                     @Override
-                    public void recoverExceptionally(Exception e) {
-                        if (e instanceof PulsarClientException.BrokerMetadataException) {
+                    public void recoverExceptionally(Throwable e) {
+                        if (e instanceof PulsarClientException.BrokerMetadataException
+                                || e instanceof PulsarClientException.BrokerPersistenceException) {

Review comment:
       Does the BrokerPersistenceException is introduced by the transaction buffer recovery stage? I think it should be introduced in adding entries to the transaction buffer?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org