You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/19 06:02:28 UTC

[pulsar] 14/14: [fix][txn]: fix transaction pending ack store managed ledger WriteFail state (#14738)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6d3fbbea4e9aaaf183ff4117d368b19d9ab6ad7e
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Fri Mar 18 21:25:36 2022 +0800

    [fix][txn]: fix transaction pending ack store managed ledger WriteFail state (#14738)
    
    like #10711
    
    ```
    java.util.concurrent.CompletionException: org.apache.pulsar.broker.service.BrokerServiceException$PersistenceException: org.apache.bookkeeper.mledger.ManagedLedgerException$ManagedLedgerAlreadyClosedException: Waiting to recover from failure
    	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
    	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
    	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:704) ~[?:?]
    	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    	at org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore$2.addFailed(MLPendingAckStore.java:286) ~[io.streamnative-pulsar-broker-2.9.2.5.jar:2.9.2.5]
    	at org.apache.bookkeeper.mledger.impl.OpAddEntry.failed(OpAddEntry.java:138) ~[io.streamnative-managed-ledger-2.9.2.5.jar:2.9.2.5]
    	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalAsyncAddEntry(ManagedLedgerImpl.java:743) ~[io.streamnative-managed-ledger-2.9.2.5.jar:2.9.2.5]
    	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$asyncAddEntry$3(ManagedLedgerImpl.java:708) ~[io.streamnative-managed-ledger-2.9.2.5.jar:2.9.2.5]
    	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [io.streamnative-managed-ledger-2.9.2.5.jar:2.9.2.5]
    	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.14.4.jar:4.14.4]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
    	at java.lang.Thread.run(Thread.java:829) [?:?]
    Caused by: org.apache.pulsar.broker.service.BrokerServiceException$PersistenceException: org.apache.bookkeeper.mledger.ManagedLedgerException$ManagedLedgerAlreadyClosedException: Waiting to recover from failure
    	... 10 more
    ```
    
    ## Motivation
    when transaction pending ack managed ledger state become WriteFailed state, should `readyToCreateNewLedger`.
    
    ## implement
    
    append fail check the managedLedger state and the exception do `readyToCreateNewLedger`
    ### Verifying this change
    Add the tests for it
    
    (cherry picked from commit 6c3711c581a33266e7d686a20344c3b41690f0ca)
---
 .../pendingack/impl/MLPendingAckStore.java         |  4 +
 .../pendingack/PendingAckMetadataTest.java         | 98 ++++++++++++++++++++++
 .../coordinator/impl/MLTransactionLogImpl.java     |  6 +-
 3 files changed, 103 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index d869be3..36b775b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -282,6 +282,10 @@ public class MLPendingAckStore implements PendingAckStore {
             public void addFailed(ManagedLedgerException exception, Object ctx) {
                 log.error("[{}][{}] MLPendingAckStore message append fail exception : {}, operation : {}",
                         managedLedger.getName(), ctx, exception, pendingAckMetadataEntry.getPendingAckOp());
+
+                if (exception instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException) {
+                    managedLedger.readyToCreateNewLedger();
+                }
                 buf.release();
                 completableFuture.completeExceptionally(new PersistenceException(exception));
             }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java
new file mode 100644
index 0000000..c99eee6
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack;
+
+import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.CommandAck;
+import org.testng.annotations.Test;
+import java.lang.reflect.Field;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.WriteFailed;
+import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.fail;
+
+public class PendingAckMetadataTest extends MockedBookKeeperTestCase {
+
+    public PendingAckMetadataTest() {
+        super(3);
+    }
+
+    @Test
+    public void testPendingAckManageLedgerWriteFailState() throws Exception {
+        ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
+        factoryConf.setMaxCacheSize(0);
+
+        String pendingAckTopicName = MLPendingAckStore
+                .getTransactionPendingAckStoreSuffix("test", "test");
+        @Cleanup("shutdown")
+        ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
+
+        CompletableFuture<ManagedLedger> completableFuture = new CompletableFuture<>();
+        factory.asyncOpen(pendingAckTopicName, new AsyncCallbacks.OpenLedgerCallback() {
+            @Override
+            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+                completableFuture.complete(ledger);
+            }
+
+            @Override
+            public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
+
+            }
+        }, null);
+
+        ManagedCursor cursor = completableFuture.get().openCursor("test");
+        ManagedCursor subCursor = completableFuture.get().openCursor("test");
+        MLPendingAckStore pendingAckStore =
+                new MLPendingAckStore(completableFuture.get(), cursor, subCursor);
+
+        Field field = MLPendingAckStore.class.getDeclaredField("managedLedger");
+        field.setAccessible(true);
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) field.get(pendingAckStore);
+        field = ManagedLedgerImpl.class.getDeclaredField("STATE_UPDATER");
+        field.setAccessible(true);
+        AtomicReferenceFieldUpdater<ManagedLedgerImpl, ManagedLedgerImpl.State> state =
+                (AtomicReferenceFieldUpdater<ManagedLedgerImpl, ManagedLedgerImpl.State>) field.get(managedLedger);
+        state.set(managedLedger, WriteFailed);
+        try {
+            pendingAckStore.appendAbortMark(new TxnID(1, 1), CommandAck.AckType.Cumulative).get();
+            fail();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause().getCause() instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException);
+        }
+        pendingAckStore.appendAbortMark(new TxnID(1, 1), CommandAck.AckType.Cumulative).get();
+
+        completableFuture.get().close();
+        cursor.close();
+        subCursor.close();
+    }
+
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index a4c347a..dce0b2f 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -31,8 +31,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.Position;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
@@ -163,9 +161,7 @@ public class MLTransactionLogImpl implements TransactionLog {
             @Override
             public void addFailed(ManagedLedgerException exception, Object ctx) {
                 log.error("Transaction log write transaction operation error", exception);
-                if (exception instanceof ManagedLedgerAlreadyClosedException
-                        && managedLedger instanceof ManagedLedgerImpl
-                        && State.WriteFailed == ((ManagedLedgerImpl) managedLedger).getState()) {
+                if (exception instanceof ManagedLedgerAlreadyClosedException) {
                     managedLedger.readyToCreateNewLedger();
                 }
                 buf.release();