You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/21 03:02:43 UTC

[pulsar] 12/15: [Transaction]stop TP replaying with Exception (#12700)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7ca4552ac4aaeb81b409764da34cccee0e81a97b
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri Dec 17 22:28:29 2021 +0800

    [Transaction]stop TP replaying with Exception (#12700)
    
    When MLPendingAckStore replaying, if any ledger was deleted from bookkeeper, or ManagerLedger was fenced, MLPendingAckStore will not stop recovering and continue to report the exception.
    
    End replaying when there is no ledger to read or the managerLedger is fenced.
    
    Add a unit test.
    
    (cherry picked from commit a962137f530cd2d6c2315749270a1d2cae8b1cc2)
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   |  6 +++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  1 +
 .../mledger/impl/ManagedCursorContainerTest.java   |  5 ++
 .../pendingack/impl/MLPendingAckStore.java         | 25 +++++----
 .../pulsar/broker/transaction/TransactionTest.java | 61 ++++++++++++++++++++++
 .../coordinator/impl/MLTransactionLogImpl.java     |  2 +-
 6 files changed, 89 insertions(+), 11 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 72ee1a1..d1fb90a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -710,4 +710,10 @@ public interface ManagedCursor {
      * @return if read position changed
      */
     boolean checkAndUpdateReadPositionChanged();
+
+    /**
+     * Checks if the cursor is closed.
+     * @return whether this cursor is closed.
+     */
+    public boolean isClosed();
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 521ec7a..3d382ad 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -837,6 +837,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         }
     }
 
+    @Override
     public boolean isClosed() {
         return state == State.Closed || state == State.Closing;
     }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 57e1964..af30c9c 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -385,6 +385,11 @@ public class ManagedCursorContainerTest {
         public boolean checkAndUpdateReadPositionChanged() {
             return false;
         }
+
+        @Override
+        public boolean isClosed() {
+            return false;
+        }
     }
 
     @Test
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index fb88878..1592318 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -34,7 +34,6 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
-import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
@@ -303,13 +302,12 @@ public class MLPendingAckStore implements PendingAckStore {
         @Override
         public void run() {
             try {
-                while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0) {
-                    if (((ManagedCursorImpl) cursor).isClosed()) {
-                        log.warn("[{}] MLPendingAckStore cursor have been closed, close replay thread.",
-                                cursor.getManagedLedger().getName());
-                        return;
-                    }
-                    fillEntryQueueCallback.fillQueue();
+                if (cursor.isClosed()) {
+                    log.warn("[{}] MLPendingAckStore cursor have been closed, close replay thread.",
+                            cursor.getManagedLedger().getName());
+                    return;
+                }
+                while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0 && fillEntryQueueCallback.fillQueue()) {
                     Entry entry = entryQueue.poll();
                     if (entry != null) {
                         ByteBuf buffer = entry.getDataBuffer();
@@ -361,15 +359,17 @@ public class MLPendingAckStore implements PendingAckStore {
 
     class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback {
 
+        private volatile boolean isReadable = true;
         private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
 
-        void fillQueue() {
+        boolean fillQueue() {
             if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) {
                 if (cursor.hasMoreEntries()) {
                     outstandingReadsRequests.incrementAndGet();
                     readAsync(100, this);
                 }
             }
+            return isReadable;
         }
 
         @Override
@@ -389,7 +389,12 @@ public class MLPendingAckStore implements PendingAckStore {
 
         @Override
         public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
-            log.error("MLPendingAckStore stat reply fail!", exception);
+            if (managedLedger.getConfig().isAutoSkipNonRecoverableData()
+                    && exception instanceof ManagedLedgerException.NonRecoverableLedgerException
+                    || exception instanceof ManagedLedgerException.ManagedLedgerFencedException) {
+                isReadable = false;
+            }
+            log.error("MLPendingAckStore of topic [{}] stat reply fail!", managedLedger.getName(), exception);
             outstandingReadsRequests.decrementAndGet();
         }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 90ddc4b..9cad6fd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -49,6 +49,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -60,8 +61,10 @@ import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
 import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
+import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -76,6 +79,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -503,6 +507,63 @@ public class TransactionTest extends TransactionTestBase {
     }
 
     @Test
+    public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{
+        String topic = NAMESPACE1 + "/testEndTPRecoveringWhenManagerLedgerDisReadable";
+        admin.topics().createNonPartitionedTopic(topic);
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .producerName("test")
+                .enableBatching(false)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .topic(topic)
+                .create();
+        producer.newMessage().send();
+
+        PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
+                .getTopic(topic, false).get().get();
+        persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
+        PersistentSubscription persistentSubscription = (PersistentSubscription) persistentTopic
+                .createSubscription("test",
+                CommandSubscribe.InitialPosition.Earliest, false).get();
+
+        ManagedCursorImpl managedCursor = mock(ManagedCursorImpl.class);
+        doReturn(true).when(managedCursor).hasMoreEntries();
+        doReturn(false).when(managedCursor).isClosed();
+        doReturn(new PositionImpl(-1, -1)).when(managedCursor).getMarkDeletedPosition();
+        doAnswer(invocation -> {
+            AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
+            callback.readEntriesFailed(new ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"),
+                    null);
+            return null;
+        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
+
+        TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class);
+        doReturn(CompletableFuture.completedFuture(
+                new MLPendingAckStore(persistentTopic.getManagedLedger(), managedCursor, null)))
+                .when(pendingAckStoreProvider).newPendingAckStore(any());
+        doReturn(CompletableFuture.completedFuture(true)).when(pendingAckStoreProvider).checkInitializedBefore(any());
+
+        Class<PulsarService> pulsarServiceClass = PulsarService.class;
+        Field field = pulsarServiceClass.getDeclaredField("transactionPendingAckStoreProvider");
+        field.setAccessible(true);
+        field.set(getPulsarServiceList().get(0), pendingAckStoreProvider);
+
+        PendingAckHandleImpl pendingAckHandle1 = new PendingAckHandleImpl(persistentSubscription);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(pendingAckHandle1.getStats().state, "Ready"));
+
+        doAnswer(invocation -> {
+            AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
+            callback.readEntriesFailed(new ManagedLedgerException.ManagedLedgerFencedException(), null);
+            return null;
+        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
+
+        PendingAckHandleImpl pendingAckHandle2 = new PendingAckHandleImpl(persistentSubscription);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(pendingAckHandle2.getStats().state, "Ready"));
+    }
+
+    @Test
     public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{
         String topic = NAMESPACE1 + "/testEndTBRecoveringWhenManagerLedgerDisReadable";
         admin.topics().createNonPartitionedTopic(topic);
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index e154bb8..8bf2ebf 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -240,7 +240,7 @@ public class MLTransactionLogImpl implements TransactionLog {
     class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback {
 
         private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
-        private boolean isReadable = true;
+        private volatile boolean isReadable = true;
 
         boolean fillQueue() {
             if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) {