You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/25 06:21:36 UTC

[pulsar] branch branch-2.9 updated (06a4678 -> f0a2171)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 06a4678  [fix][test]: fix flaky test testTransactionBufferRecoverThrowPulsarClientException (#14846)
     new c6f17d3  [fix][txn]: fix transaction buffer no snapshot close recover reader (#14830)
     new f0a2171  [fix][txn]: fix transaction buffer recover throw cursor already close (#14807)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../buffer/impl/TopicTransactionBuffer.java        | 25 ++++++++------
 .../TopicTransactionBufferRecoverTest.java         | 39 +++++++++++++++++-----
 .../pulsar/broker/transaction/TransactionTest.java | 12 +++++++
 .../coordinator/impl/MLTransactionLogImpl.java     |  2 ++
 4 files changed, 58 insertions(+), 20 deletions(-)

[pulsar] 02/02: [fix][txn]: fix transaction buffer recover throw cursor already close (#14807)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f0a2171cbad894cec5bfb2d4de31cb8de32a3183
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Fri Mar 25 14:16:29 2022 +0800

    [fix][txn]: fix transaction buffer recover throw cursor already close (#14807)
    
    ### Motivation
    When Transaction buffer recover fail throw CursorAlreadyClosedException, we should stop the recover op. the cursor was been closed, the transaction buffer was been closed, so we should stop the recover op, in order to release thread resources
    like https://github.com/apache/pulsar/pull/14781
    
    (cherry picked from commit aef5f6d5e2d44f84c9c358f1d9dd9db1108a9d99)
---
 .../transaction/buffer/impl/TopicTransactionBuffer.java      |  3 ++-
 .../apache/pulsar/broker/transaction/TransactionTest.java    | 12 ++++++++++++
 .../transaction/coordinator/impl/MLTransactionLogImpl.java   |  2 ++
 3 files changed, 16 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 9ea3b42..66ce8f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -734,7 +734,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
         public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
             if (recover.topic.getManagedLedger().getConfig().isAutoSkipNonRecoverableData()
                     && exception instanceof ManagedLedgerException.NonRecoverableLedgerException
-                    || exception instanceof ManagedLedgerException.ManagedLedgerFencedException) {
+                    || exception instanceof ManagedLedgerException.ManagedLedgerFencedException
+                    || exception instanceof ManagedLedgerException.CursorAlreadyClosedException) {
                 isReadable = false;
             } else {
                 outstandingReadsRequests.decrementAndGet();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 6fd1e92..231c183 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -568,6 +568,18 @@ public class TransactionTest extends TransactionTestBase {
         Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
                 assertEquals(buffer2.getStats().state, "Ready"));
         managedCursors.removeCursor("transaction-buffer-sub");
+
+        doAnswer(invocation -> {
+            AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
+            callback.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("test"), null);
+            return null;
+        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
+
+        managedCursors.add(managedCursor);
+        TransactionBuffer buffer3 = new TopicTransactionBuffer(persistentTopic);
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
+                assertEquals(buffer3.getStats().state, "Ready"));
+        managedCursors.removeCursor("transaction-buffer-sub");
     }
 
     @Test
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index 2a13f95..f14784e 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -274,6 +274,8 @@ public class MLTransactionLogImpl implements TransactionLog {
                     || exception instanceof ManagedLedgerException.ManagedLedgerFencedException
                     || exception instanceof ManagedLedgerException.CursorAlreadyClosedException) {
                 isReadable = false;
+            } else {
+                outstandingReadsRequests.decrementAndGet();
             }
             log.error("Transaction log init fail error!", exception);
         }

[pulsar] 01/02: [fix][txn]: fix transaction buffer no snapshot close recover reader (#14830)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c6f17d3ce41c2b24a8a1d4f891d59b4ffc403e02
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Fri Mar 25 13:52:40 2022 +0800

    [fix][txn]: fix transaction buffer no snapshot close recover reader (#14830)
    
    ### Motivation
    now transaction buffer recover no snapshot, we don't close the reader, it will produce the problem of OOM
    
    (cherry picked from commit 7a78b505f3f2d9febbcd6a161102b5a584f475c8)
---
 .../buffer/impl/TopicTransactionBuffer.java        | 22 ++++++------
 .../TopicTransactionBufferRecoverTest.java         | 39 +++++++++++++++++-----
 2 files changed, 42 insertions(+), 19 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 7ed656f..9ea3b42 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -181,6 +181,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                     @Override
                     public void recoverExceptionally(Throwable e) {
 
+                        log.warn("Closing topic {} due to read transaction buffer snapshot while recovering the "
+                                + "transaction buffer throw exception", topic.getName(), e);
                         // when create reader or writer fail throw PulsarClientException,
                         // should close this topic and then reinit this topic
                         if (e instanceof PulsarClientException) {
@@ -189,8 +191,6 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                             // the tc do op will retry
                             transactionBufferFuture.completeExceptionally
                                     (new BrokerServiceException.ServiceUnitNotReadyException(e.getMessage(), e));
-                            log.warn("Closing topic {} due to read transaction buffer snapshot while recovering the "
-                                    + "transaction buffer throw exception", topic.getName(), e);
                         } else {
                             transactionBufferFuture.completeExceptionally(e);
                         }
@@ -590,6 +590,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                                     }
                                 }
                                 if (!hasSnapshot) {
+                                    closeReader(reader);
                                     callBack.noNeedToRecover();
                                     return;
                                 }
@@ -597,16 +598,10 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                                 log.error("[{}]Transaction buffer recover fail when read "
                                         + "transactionBufferSnapshot!", topic.getName(), pulsarClientException);
                                 callBack.recoverExceptionally(pulsarClientException);
-                                reader.closeAsync().exceptionally(e -> {
-                                    log.error("[{}]Transaction buffer reader close error!", topic.getName(), e);
-                                    return null;
-                                });
+                                closeReader(reader);
                                 return;
                             }
-                            reader.closeAsync().exceptionally(e -> {
-                                log.error("[{}]Transaction buffer reader close error!", topic.getName(), e);
-                                return null;
-                            });
+                            closeReader(reader);
 
                             ManagedCursor managedCursor;
                             try {
@@ -679,6 +674,13 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
             log.error("Transaction buffer recover fail when recover transaction entry!", e);
             this.exceptionNumber.getAndIncrement();
         }
+
+        private void closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> reader) {
+            reader.closeAsync().exceptionally(e -> {
+                log.error("[{}]Transaction buffer reader close error!", topic.getName(), e);
+                return null;
+            });
+        }
     }
 
     static class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 392a21f..c349173 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -18,6 +18,16 @@
  */
 package org.apache.pulsar.broker.transaction;
 
+import static org.apache.pulsar.common.events.EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
@@ -59,6 +69,7 @@ import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
@@ -66,15 +77,6 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
 
 @Slf4j
 public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
@@ -548,4 +550,23 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
         producer.close();
     }
 
+
+    @Test
+    public void testTransactionBufferNoSnapshotCloseReader() throws Exception{
+        String topic = NAMESPACE1 + "/test";
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).producerName("testTxnTimeOut_producer")
+                .topic(topic).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
+
+        admin.topics().unload(topic);
+
+        // unload success, all readers have been closed except for the compaction sub
+        producer.send("test");
+        TopicStats stats = admin.topics().getStats(NAMESPACE1 + "/" + TRANSACTION_BUFFER_SNAPSHOT);
+
+        // except for the compaction sub
+        assertEquals(stats.getSubscriptions().size(), 1);
+        assertTrue(stats.getSubscriptions().keySet().contains("__compaction"));
+    }
+
 }