You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/16 11:25:41 UTC
[pulsar] branch branch-2.10 updated: [fix][txn] transaction pending ack store future not completely problem (#18943)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 64257a71614 [fix][txn] transaction pending ack store future not completely problem (#18943)
64257a71614 is described below
commit 64257a71614e44394307a47d608155de25c5e87e
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Fri Dec 16 17:41:33 2022 +0800
[fix][txn] transaction pending ack store future not completely problem (#18943)
when `MLPendingAckStoreProvider` init PendingAckStore gets the ManagedLedger config throw exception, we don't handle the exception. and the `pendingAckStoreFeture` can't be complete, topic unload will use this future to close the pendingAck.
https://github.com/apache/pulsar/blob/3011946a5c3b64ed7c08b6bfb1f6492f8aaaca9c/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java#L114-L115
when getting managedledger config to fail, `pendingAckStoreFeture` will `completeExceptionally()`;
when pendingAckStore init fail, close pendingAckHandle success directly
mock get managedLeger config throw exception, then unload can success
(cherry picked from commit 1d9956cf9fc081b219d1db14eb2686677ea63021)
---
.../service/persistent/PersistentSubscription.java | 2 +-
.../transaction/pendingack/PendingAckHandle.java | 2 +-
.../pendingack/impl/MLPendingAckStoreProvider.java | 25 +++++----
.../pendingack/impl/PendingAckHandleDisabled.java | 2 +-
.../pendingack/impl/PendingAckHandleImpl.java | 28 ++++++++--
.../pulsar/broker/transaction/TransactionTest.java | 2 +-
.../pendingack/PendingAckPersistentTest.java | 59 +++++++++++++++++++++-
7 files changed, 102 insertions(+), 18 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index cab555bb9cd..c2999c1c741 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -763,7 +763,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
if (dispatcher != null && dispatcher.isConsumerConnected()) {
return FutureUtil.failedFuture(new SubscriptionBusyException("Subscription has active consumers"));
}
- return this.pendingAckHandle.close().thenAccept(v -> {
+ return this.pendingAckHandle.closeAsync().thenAccept(v -> {
IS_FENCED_UPDATER.set(this, TRUE);
log.info("[{}][{}] Successfully closed subscription [{}]", topicName, subName, cursor);
});
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
index 620db5c4b48..e9984baf007 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
@@ -150,7 +150,7 @@ public interface PendingAckHandle {
*
* @return the future of this operation.
*/
- CompletableFuture<Void> close();
+ CompletableFuture<Void> closeAsync();
/**
* Check if the PendingAckStore is init.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
index 6b84d6e329a..7fe2a775058 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
/**
@@ -107,15 +108,21 @@ public class MLPendingAckStoreProvider implements TransactionPendingAckStoreProv
pendingAckStoreFuture.completeExceptionally(exception);
}
}, () -> true, null);
- });
- }).exceptionally(e -> {
- log.error("Failed to obtain the existence of ManagerLedger with topic and subscription : "
- + originPersistentTopic.getSubscriptions() + " "
- + subscription.getName());
- pendingAckStoreFuture.completeExceptionally(
- e.getCause());
- return null;
- });
+ }).exceptionally(e -> {
+ Throwable t = FutureUtil.unwrapCompletionException(e);
+ log.error("[{}] [{}] Failed to get managedLedger config when init pending ack store!",
+ originPersistentTopic, subscription, t);
+ pendingAckStoreFuture.completeExceptionally(t);
+ return null;
+
+ });
+ }).exceptionally(e -> {
+ Throwable t = FutureUtil.unwrapCompletionException(e);
+ log.error("[{}] [{}] Failed to check the pending ack topic exist when init pending ack store!",
+ originPersistentTopic, subscription, t);
+ pendingAckStoreFuture.completeExceptionally(t);
+ return null;
+ });
return pendingAckStoreFuture;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
index 119072700df..6a0aca6f9d7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
@@ -92,7 +92,7 @@ public class PendingAckHandleDisabled implements PendingAckHandle {
}
@Override
- public CompletableFuture<Void> close() {
+ public CompletableFuture<Void> closeAsync() {
return CompletableFuture.completedFuture(null);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 3422401ab83..6bdc1ee03a4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -150,9 +150,11 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
completeHandleFuture();
}
})
- .exceptionally(t -> {
+ .exceptionally(e -> {
+ Throwable t = FutureUtil.unwrapCompletionException(e);
changeToErrorState();
exceptionHandleFuture(t);
+ this.pendingAckStoreFuture.completeExceptionally(t);
return null;
});
}
@@ -165,7 +167,7 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
pendingAckStore.replayAsync(this, internalPinnedExecutor);
}).exceptionally(e -> {
- acceptQueue.clear();
+ handleCacheRequest();
changeToErrorState();
log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e);
exceptionHandleFuture(e.getCause());
@@ -940,11 +942,29 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
}
@Override
- public CompletableFuture<Void> close() {
+ public CompletableFuture<Void> closeAsync() {
changeToCloseState();
synchronized (PendingAckHandleImpl.this) {
if (this.pendingAckStoreFuture != null) {
- return this.pendingAckStoreFuture.thenAccept(PendingAckStore::closeAsync);
+ CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+ this.pendingAckStoreFuture.whenComplete((pendingAckStore, e) -> {
+ if (e != null) {
+ // init pending ack store fail, close don't need to
+ // retry and throw exception, complete directly
+ closeFuture.complete(null);
+ } else {
+ pendingAckStore.closeAsync().whenComplete((q, ex) -> {
+ if (ex != null) {
+ Throwable t = FutureUtil.unwrapCompletionException(ex);
+ closeFuture.completeExceptionally(t);
+ } else {
+ closeFuture.complete(null);
+ }
+ });
+ }
+ });
+
+ return closeFuture;
} else {
return CompletableFuture.completedFuture(null);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 6a90c9a63d8..13040fce34c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1130,7 +1130,7 @@ public class TransactionTest extends TransactionTestBase {
public Object answer(InvocationOnMock invocation) throws Throwable {
executorService.execute(()->{
PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl) invocation.getArguments()[0];
- pendingAckHandle.close();
+ pendingAckHandle.closeAsync();
MLPendingAckReplyCallBack mlPendingAckReplyCallBack
= new MLPendingAckReplyCallBack(pendingAckHandle);
mlPendingAckReplyCallBack.replayComplete();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 8f97dd6b45e..f9047a849d5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -21,7 +21,10 @@ package org.apache.pulsar.broker.transaction.pendingack;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.testng.AssertJUnit.assertNotNull;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
@@ -36,6 +39,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.pulsar.broker.service.AbstractTopic;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
@@ -57,6 +63,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -701,4 +708,54 @@ public class PendingAckPersistentTest extends TransactionTestBase {
patternConsumer.close();
producer.close();
}
+
+ @Test
+ public void testGetManagedLegerConfigFailThenUnload() throws Exception {
+ String topic = TopicName.get(TopicDomain.persistent.toString(),
+ NamespaceName.get(NAMESPACE1), "testGetManagedLegerConfigFailThenUnload").toString();
+
+ String subscriptionName = "sub";
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(false)
+ .topic(topic).create();
+
+ PersistentTopic persistentTopic =
+ (PersistentTopic) getPulsarServiceList()
+ .get(0)
+ .getBrokerService()
+ .getTopic(topic, false)
+ .get().orElse(null);
+
+ assertNotNull(persistentTopic);
+ BrokerService brokerService = spy(persistentTopic.getBrokerService());
+ doReturn(FutureUtil.failedFuture(new BrokerServiceException.ServiceUnitNotReadyException("test")))
+ .when(brokerService).getManagedLedgerConfig(any());
+ Field field = AbstractTopic.class.getDeclaredField("brokerService");
+ field.setAccessible(true);
+ field.set(persistentTopic, brokerService);
+
+ // init pending ack store
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Shared)
+ .topic(topic)
+ .subscribe();
+
+ producer.send("test");
+ Transaction transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(30, TimeUnit.SECONDS).build().get();
+
+ // pending ack init fail, so the ack will throw exception
+ try {
+ consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get();
+ } catch (Exception e) {
+ assertTrue(e.getCause() instanceof PulsarClientException.LookupException);
+ }
+
+ // can unload success
+ admin.topics().unload(topic);
+ }
}