You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/16 11:51:45 UTC
[pulsar] branch branch-2.11 updated: [fix][txn] transaction pending ack store future not completely problem (#18943)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new a968da59abd [fix][txn] transaction pending ack store future not completely problem (#18943)
a968da59abd is described below
commit a968da59abd43a038b973226edbcc55cfa48e340
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Fri Dec 16 17:41:33 2022 +0800
[fix][txn] transaction pending ack store future not completely problem (#18943)
when `MLPendingAckStoreProvider` init PendingAckStore gets the ManagedLedger config throw exception, we don't handle the exception. and the `pendingAckStoreFeture` can't be complete, topic unload will use this future to close the pendingAck.
https://github.com/apache/pulsar/blob/3011946a5c3b64ed7c08b6bfb1f6492f8aaaca9c/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java#L114-L115
when getting managedledger config to fail, `pendingAckStoreFeture` will `completeExceptionally()`;
when pendingAckStore init fail, close pendingAckHandle success directly
mock get managedLeger config throw exception, then unload can success
(cherry picked from commit 1d9956cf9fc081b219d1db14eb2686677ea63021)
---
.../service/persistent/PersistentSubscription.java | 2 +-
.../transaction/pendingack/PendingAckHandle.java | 2 +-
.../pendingack/impl/MLPendingAckStoreProvider.java | 25 +++++----
.../pendingack/impl/PendingAckHandleDisabled.java | 2 +-
.../pendingack/impl/PendingAckHandleImpl.java | 36 +++++++++----
.../pulsar/broker/transaction/TransactionTest.java | 2 +-
.../pendingack/PendingAckPersistentTest.java | 59 +++++++++++++++++++++-
7 files changed, 105 insertions(+), 23 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index fcb4135c97e..fe3faf35bc9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -877,7 +877,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
if (dispatcher != null && dispatcher.isConsumerConnected()) {
return FutureUtil.failedFuture(new SubscriptionBusyException("Subscription has active consumers"));
}
- return this.pendingAckHandle.close().thenAccept(v -> {
+ return this.pendingAckHandle.closeAsync().thenAccept(v -> {
IS_FENCED_UPDATER.set(this, TRUE);
log.info("[{}][{}] Successfully closed subscription [{}]", topicName, subName, cursor);
});
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
index d8e16dd4015..b51aea23e6f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
@@ -151,7 +151,7 @@ public interface PendingAckHandle {
*
* @return the future of this operation.
*/
- CompletableFuture<Void> close();
+ CompletableFuture<Void> closeAsync();
/**
* Check if the PendingAckStore is init.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
index fdff9f59146..0a4d201a87f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
@@ -131,15 +132,21 @@ public class MLPendingAckStoreProvider implements TransactionPendingAckStoreProv
pendingAckStoreFuture.completeExceptionally(exception);
}
}, () -> true, null);
- });
- }).exceptionally(e -> {
- log.error("Failed to obtain the existence of ManagerLedger with topic and subscription : "
- + originPersistentTopic.getSubscriptions() + " "
- + subscription.getName());
- pendingAckStoreFuture.completeExceptionally(
- e.getCause());
- return null;
- });
+ }).exceptionally(e -> {
+ Throwable t = FutureUtil.unwrapCompletionException(e);
+ log.error("[{}] [{}] Failed to get managedLedger config when init pending ack store!",
+ originPersistentTopic, subscription, t);
+ pendingAckStoreFuture.completeExceptionally(t);
+ return null;
+
+ });
+ }).exceptionally(e -> {
+ Throwable t = FutureUtil.unwrapCompletionException(e);
+ log.error("[{}] [{}] Failed to check the pending ack topic exist when init pending ack store!",
+ originPersistentTopic, subscription, t);
+ pendingAckStoreFuture.completeExceptionally(t);
+ return null;
+ });
return pendingAckStoreFuture;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
index e34628da887..e02502014e8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
@@ -93,7 +93,7 @@ public class PendingAckHandleDisabled implements PendingAckHandle {
}
@Override
- public CompletableFuture<Void> close() {
+ public CompletableFuture<Void> closeAsync() {
return CompletableFuture.completedFuture(null);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 38d81d059b7..64652faa6e9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -153,9 +153,11 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
completeHandleFuture();
}
})
- .exceptionally(t -> {
+ .exceptionally(e -> {
+ Throwable t = FutureUtil.unwrapCompletionException(e);
changeToErrorState();
exceptionHandleFuture(t);
+ this.pendingAckStoreFuture.completeExceptionally(t);
return null;
});
}
@@ -168,13 +170,13 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
recoverTime.setRecoverStartTime(System.currentTimeMillis());
pendingAckStore.replayAsync(this, internalPinnedExecutor);
- }).exceptionally(e -> {
- acceptQueue.clear();
+ }).exceptionallyAsync(e -> {
+ handleCacheRequest();
changeToErrorState();
log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e);
exceptionHandleFuture(e.getCause());
return null;
- });
+ }, internalPinnedExecutor);
}
}
}
@@ -927,9 +929,7 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
if (!this.pendingAckHandleCompletableFuture.isDone()) {
this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
}
- if (recoverTime.getRecoverStartTime() == 0L) {
- return;
- } else {
+ if (recoverTime.getRecoverStartTime() != 0L) {
recoverTime.setRecoverEndTime(System.currentTimeMillis());
}
}
@@ -963,11 +963,29 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
}
@Override
- public CompletableFuture<Void> close() {
+ public CompletableFuture<Void> closeAsync() {
changeToCloseState();
synchronized (PendingAckHandleImpl.this) {
if (this.pendingAckStoreFuture != null) {
- return this.pendingAckStoreFuture.thenAccept(PendingAckStore::closeAsync);
+ CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+ this.pendingAckStoreFuture.whenComplete((pendingAckStore, e) -> {
+ if (e != null) {
+ // init pending ack store fail, close don't need to
+ // retry and throw exception, complete directly
+ closeFuture.complete(null);
+ } else {
+ pendingAckStore.closeAsync().whenComplete((q, ex) -> {
+ if (ex != null) {
+ Throwable t = FutureUtil.unwrapCompletionException(ex);
+ closeFuture.completeExceptionally(t);
+ } else {
+ closeFuture.complete(null);
+ }
+ });
+ }
+ });
+
+ return closeFuture;
} else {
return CompletableFuture.completedFuture(null);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 76b91801a73..1043078d686 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1299,7 +1299,7 @@ public class TransactionTest extends TransactionTestBase {
public Object answer(InvocationOnMock invocation) throws Throwable {
executorService.execute(()->{
PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl) invocation.getArguments()[0];
- pendingAckHandle.close();
+ pendingAckHandle.closeAsync();
MLPendingAckReplyCallBack mlPendingAckReplyCallBack
= new MLPendingAckReplyCallBack(pendingAckHandle);
mlPendingAckReplyCallBack.replayComplete();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index e75c9534c59..dc5266d8869 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -21,7 +21,10 @@ package org.apache.pulsar.broker.transaction.pendingack;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.testng.AssertJUnit.assertNotNull;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
@@ -36,6 +39,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.pulsar.broker.service.AbstractTopic;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
@@ -58,6 +64,7 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -703,4 +710,54 @@ public class PendingAckPersistentTest extends TransactionTestBase {
patternConsumer.close();
producer.close();
}
+
+ @Test
+ public void testGetManagedLegerConfigFailThenUnload() throws Exception {
+ String topic = TopicName.get(TopicDomain.persistent.toString(),
+ NamespaceName.get(NAMESPACE1), "testGetManagedLegerConfigFailThenUnload").toString();
+
+ String subscriptionName = "sub";
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(false)
+ .topic(topic).create();
+
+ PersistentTopic persistentTopic =
+ (PersistentTopic) getPulsarServiceList()
+ .get(0)
+ .getBrokerService()
+ .getTopic(topic, false)
+ .get().orElse(null);
+
+ assertNotNull(persistentTopic);
+ BrokerService brokerService = spy(persistentTopic.getBrokerService());
+ doReturn(FutureUtil.failedFuture(new BrokerServiceException.ServiceUnitNotReadyException("test")))
+ .when(brokerService).getManagedLedgerConfig(any());
+ Field field = AbstractTopic.class.getDeclaredField("brokerService");
+ field.setAccessible(true);
+ field.set(persistentTopic, brokerService);
+
+ // init pending ack store
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Shared)
+ .topic(topic)
+ .subscribe();
+
+ producer.send("test");
+ Transaction transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(30, TimeUnit.SECONDS).build().get();
+
+ // pending ack init fail, so the ack will throw exception
+ try {
+ consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get();
+ } catch (Exception e) {
+ assertTrue(e.getCause() instanceof PulsarClientException.LookupException);
+ }
+
+ // can unload success
+ admin.topics().unload(topic);
+ }
}