You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/19 06:02:28 UTC
[pulsar] 14/14: [fix][txn]: fix transaction pending ack store managed ledger WriteFail state (#14738)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6d3fbbea4e9aaaf183ff4117d368b19d9ab6ad7e
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Fri Mar 18 21:25:36 2022 +0800
[fix][txn]: fix transaction pending ack store managed ledger WriteFail state (#14738)
like #10711
```
java.util.concurrent.CompletionException: org.apache.pulsar.broker.service.BrokerServiceException$PersistenceException: org.apache.bookkeeper.mledger.ManagedLedgerException$ManagedLedgerAlreadyClosedException: Waiting to recover from failure
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:704) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore$2.addFailed(MLPendingAckStore.java:286) ~[io.streamnative-pulsar-broker-2.9.2.5.jar:2.9.2.5]
at org.apache.bookkeeper.mledger.impl.OpAddEntry.failed(OpAddEntry.java:138) ~[io.streamnative-managed-ledger-2.9.2.5.jar:2.9.2.5]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalAsyncAddEntry(ManagedLedgerImpl.java:743) ~[io.streamnative-managed-ledger-2.9.2.5.jar:2.9.2.5]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$asyncAddEntry$3(ManagedLedgerImpl.java:708) ~[io.streamnative-managed-ledger-2.9.2.5.jar:2.9.2.5]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [io.streamnative-managed-ledger-2.9.2.5.jar:2.9.2.5]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.14.4.jar:4.14.4]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: org.apache.pulsar.broker.service.BrokerServiceException$PersistenceException: org.apache.bookkeeper.mledger.ManagedLedgerException$ManagedLedgerAlreadyClosedException: Waiting to recover from failure
... 10 more
```
## Motivation
when transaction pending ack managed ledger state become WriteFailed state, should `readyToCreateNewLedger`.
## implement
append fail check the managedLedger state and the exception do `readyToCreateNewLedger`
### Verifying this change
Add the tests for it
(cherry picked from commit 6c3711c581a33266e7d686a20344c3b41690f0ca)
---
.../pendingack/impl/MLPendingAckStore.java | 4 +
.../pendingack/PendingAckMetadataTest.java | 98 ++++++++++++++++++++++
.../coordinator/impl/MLTransactionLogImpl.java | 6 +-
3 files changed, 103 insertions(+), 5 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index d869be3..36b775b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -282,6 +282,10 @@ public class MLPendingAckStore implements PendingAckStore {
public void addFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}][{}] MLPendingAckStore message append fail exception : {}, operation : {}",
managedLedger.getName(), ctx, exception, pendingAckMetadataEntry.getPendingAckOp());
+
+ if (exception instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException) {
+ managedLedger.readyToCreateNewLedger();
+ }
buf.release();
completableFuture.completeExceptionally(new PersistenceException(exception));
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java
new file mode 100644
index 0000000..c99eee6
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack;
+
+import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.CommandAck;
+import org.testng.annotations.Test;
+import java.lang.reflect.Field;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.WriteFailed;
+import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.fail;
+
+public class PendingAckMetadataTest extends MockedBookKeeperTestCase {
+
+ public PendingAckMetadataTest() {
+ super(3);
+ }
+
+ @Test
+ public void testPendingAckManageLedgerWriteFailState() throws Exception {
+ ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
+ factoryConf.setMaxCacheSize(0);
+
+ String pendingAckTopicName = MLPendingAckStore
+ .getTransactionPendingAckStoreSuffix("test", "test");
+ @Cleanup("shutdown")
+ ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
+
+ CompletableFuture<ManagedLedger> completableFuture = new CompletableFuture<>();
+ factory.asyncOpen(pendingAckTopicName, new AsyncCallbacks.OpenLedgerCallback() {
+ @Override
+ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+ completableFuture.complete(ledger);
+ }
+
+ @Override
+ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
+
+ }
+ }, null);
+
+ ManagedCursor cursor = completableFuture.get().openCursor("test");
+ ManagedCursor subCursor = completableFuture.get().openCursor("test");
+ MLPendingAckStore pendingAckStore =
+ new MLPendingAckStore(completableFuture.get(), cursor, subCursor);
+
+ Field field = MLPendingAckStore.class.getDeclaredField("managedLedger");
+ field.setAccessible(true);
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) field.get(pendingAckStore);
+ field = ManagedLedgerImpl.class.getDeclaredField("STATE_UPDATER");
+ field.setAccessible(true);
+ AtomicReferenceFieldUpdater<ManagedLedgerImpl, ManagedLedgerImpl.State> state =
+ (AtomicReferenceFieldUpdater<ManagedLedgerImpl, ManagedLedgerImpl.State>) field.get(managedLedger);
+ state.set(managedLedger, WriteFailed);
+ try {
+ pendingAckStore.appendAbortMark(new TxnID(1, 1), CommandAck.AckType.Cumulative).get();
+ fail();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause().getCause() instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException);
+ }
+ pendingAckStore.appendAbortMark(new TxnID(1, 1), CommandAck.AckType.Cumulative).get();
+
+ completableFuture.get().close();
+ cursor.close();
+ subCursor.close();
+ }
+
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index a4c347a..dce0b2f 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -31,8 +31,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
@@ -163,9 +161,7 @@ public class MLTransactionLogImpl implements TransactionLog {
@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
log.error("Transaction log write transaction operation error", exception);
- if (exception instanceof ManagedLedgerAlreadyClosedException
- && managedLedger instanceof ManagedLedgerImpl
- && State.WriteFailed == ((ManagedLedgerImpl) managedLedger).getState()) {
+ if (exception instanceof ManagedLedgerAlreadyClosedException) {
managedLedger.readyToCreateNewLedger();
}
buf.release();