You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/21 00:56:59 UTC

[pulsar] 13/15: [Transaction]stop TC replaying with exception (#12705)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f3bdaec1f38142d1e7e624424a924769b7382594
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Sat Nov 20 00:15:52 2021 +0800

    [Transaction]stop TC replaying with exception (#12705)
    
    When MLTransactionLogImpl replaying, if any ledger was deleted from bookkeeper, or ManagerLedger was fenced, MLTransactionLogImpl will not stop recovering and continue to report the exception.
    
    End replaying when there is no ledger to read or the managerLedger is fenced.
    
    (cherry picked from commit 06f1a91c05d1e11cd9ce8c85e042224e57495390)
---
 .../pulsar/broker/transaction/TransactionTest.java | 81 ++++++++++++++++++++--
 .../coordinator/impl/MLTransactionLogImpl.java     | 10 ++-
 2 files changed, 85 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index a237314..e4975d9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -21,6 +21,12 @@ package org.apache.pulsar.broker.transaction;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
 import static org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore.PENDING_ACK_STORE_SUFFIX;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
@@ -28,6 +34,8 @@ import static org.testng.Assert.fail;
 
 import io.netty.buffer.Unpooled;
 import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -37,6 +45,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
@@ -73,6 +83,12 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogInterceptor;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -411,11 +427,11 @@ public class TransactionTest extends TransactionTestBase {
     }
 
     @Test
-    public void testMaxReadPositionForNormalPublish() throws Exception{
+    public void testMaxReadPositionForNormalPublish() throws Exception {
         String topic = "persistent://" + NAMESPACE1 + "/NormalPublish";
         admin.topics().createNonPartitionedTopic(topic);
         PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
-                  .getTopic(topic, false).get().get();
+                .getTopic(topic, false).get().get();
 
         TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
         PulsarClient noTxnClient = PulsarClient.builder().enableTransaction(false)
@@ -443,7 +459,7 @@ public class TransactionTest extends TransactionTestBase {
                 .sendTimeout(0, TimeUnit.SECONDS)
                 .create();
 
-        Awaitility.await().untilAsserted(() ->Assert.assertTrue(topicTransactionBuffer.checkIfReady()));
+        Awaitility.await().untilAsserted(() -> Assert.assertTrue(topicTransactionBuffer.checkIfReady()));
         //test publishing txn messages will not change maxReadPosition if don`t commit or abort.
         Transaction transaction = pulsarClient.newTransaction()
                 .withTransactionTimeout(5, TimeUnit.SECONDS).build().get();
@@ -483,5 +499,62 @@ public class TransactionTest extends TransactionTestBase {
         Assert.assertEquals(position5.getLedgerId(), messageId4.getLedgerId());
         Assert.assertEquals(position5.getEntryId(), messageId4.getEntryId());
 
-        }
+    }
+
+    @Test
+    public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{
+        String topic = NAMESPACE1 + "/testEndTBRecoveringWhenManagerLedgerDisReadable";
+        admin.topics().createNonPartitionedTopic(topic);
+
+        PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
+                .getTopic(topic, false).get().get();
+        persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
+        Map<String, String> map = new HashMap<>();
+        map.put(MLTransactionLogInterceptor.MAX_LOCAL_TXN_ID, "1");
+        persistentTopic.getManagedLedger().setProperties(map);
+
+        ManagedCursor managedCursor = mock(ManagedCursor.class);
+        doReturn(true).when(managedCursor).hasMoreEntries();
+        doAnswer(invocation -> {
+            AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
+            callback.readEntriesFailed(new ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"),
+                    null);
+            return null;
+        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
+
+        MLTransactionLogImpl mlTransactionLog =
+                new MLTransactionLogImpl(new TransactionCoordinatorID(1), null,
+                        persistentTopic.getManagedLedger().getConfig());
+        Class<MLTransactionLogImpl> mlTransactionLogClass = MLTransactionLogImpl.class;
+        Field field = mlTransactionLogClass.getDeclaredField("cursor");
+        field.setAccessible(true);
+        field.set(mlTransactionLog, managedCursor);
+        field = mlTransactionLogClass.getDeclaredField("managedLedger");
+        field.setAccessible(true);
+        field.set(mlTransactionLog, persistentTopic.getManagedLedger());
+
+        TransactionRecoverTracker transactionRecoverTracker = mock(TransactionRecoverTracker.class);
+        doNothing().when(transactionRecoverTracker).appendOpenTransactionToTimeoutTracker();
+        doNothing().when(transactionRecoverTracker).handleCommittingAndAbortingTransaction();
+        TransactionTimeoutTracker timeoutTracker = mock(TransactionTimeoutTracker.class);
+        doNothing().when(timeoutTracker).start();
+        MLTransactionMetadataStore metadataStore1 =
+                new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
+                        mlTransactionLog, timeoutTracker, transactionRecoverTracker);
+
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(metadataStore1.getCoordinatorStats().state, "Ready"));
+
+        doAnswer(invocation -> {
+            AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
+            callback.readEntriesFailed(new ManagedLedgerException.ManagedLedgerFencedException(), null);
+            return null;
+        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
+
+        MLTransactionMetadataStore metadataStore2 =
+                new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
+                        mlTransactionLog, timeoutTracker, transactionRecoverTracker);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(metadataStore2.getCoordinatorStats().state, "Ready"));
+    }
 }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index c044275..2d11d98 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -281,18 +281,19 @@ public class MLTransactionLogImpl implements TransactionLog {
     class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback {
 
         private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
+        private boolean isReadable = true;
 
         boolean fillQueue() {
             if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) {
                 if (cursor.hasMoreEntries()) {
                     outstandingReadsRequests.incrementAndGet();
                     readAsync(100, this);
-                    return true;
+                    return isReadable;
                 } else {
                     return false;
                 }
             } else {
-                return true;
+                return isReadable;
             }
         }
 
@@ -313,6 +314,11 @@ public class MLTransactionLogImpl implements TransactionLog {
 
         @Override
         public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+            if (managedLedgerConfig.isAutoSkipNonRecoverableData()
+                    && exception instanceof ManagedLedgerException.NonRecoverableLedgerException
+                    || exception instanceof ManagedLedgerException.ManagedLedgerFencedException) {
+                isReadable = false;
+            }
             log.error("Transaction log init fail error!", exception);
             outstandingReadsRequests.decrementAndGet();
         }