You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/16 11:51:45 UTC

[pulsar] branch branch-2.11 updated: [fix][txn] transaction pending ack store future not completely problem (#18943)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new a968da59abd [fix][txn] transaction pending ack store future not completely problem (#18943)
a968da59abd is described below

commit a968da59abd43a038b973226edbcc55cfa48e340
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Fri Dec 16 17:41:33 2022 +0800

    [fix][txn] transaction pending ack store future not completely problem (#18943)
    
    when `MLPendingAckStoreProvider` init PendingAckStore gets the ManagedLedger config throw exception, we don't handle the exception. and the `pendingAckStoreFeture` can't be complete, topic unload will use this future to close the pendingAck.
    https://github.com/apache/pulsar/blob/3011946a5c3b64ed7c08b6bfb1f6492f8aaaca9c/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java#L114-L115
    when getting managedledger config to fail, `pendingAckStoreFeture` will `completeExceptionally()`;
    
    when pendingAckStore init fail, close pendingAckHandle success directly
    
    mock get managedLeger config throw exception, then unload can success
    
    (cherry picked from commit 1d9956cf9fc081b219d1db14eb2686677ea63021)
---
 .../service/persistent/PersistentSubscription.java |  2 +-
 .../transaction/pendingack/PendingAckHandle.java   |  2 +-
 .../pendingack/impl/MLPendingAckStoreProvider.java | 25 +++++----
 .../pendingack/impl/PendingAckHandleDisabled.java  |  2 +-
 .../pendingack/impl/PendingAckHandleImpl.java      | 36 +++++++++----
 .../pulsar/broker/transaction/TransactionTest.java |  2 +-
 .../pendingack/PendingAckPersistentTest.java       | 59 +++++++++++++++++++++-
 7 files changed, 105 insertions(+), 23 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index fcb4135c97e..fe3faf35bc9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -877,7 +877,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
             if (dispatcher != null && dispatcher.isConsumerConnected()) {
                 return FutureUtil.failedFuture(new SubscriptionBusyException("Subscription has active consumers"));
             }
-            return this.pendingAckHandle.close().thenAccept(v -> {
+            return this.pendingAckHandle.closeAsync().thenAccept(v -> {
                 IS_FENCED_UPDATER.set(this, TRUE);
                 log.info("[{}][{}] Successfully closed subscription [{}]", topicName, subName, cursor);
             });
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
index d8e16dd4015..b51aea23e6f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
@@ -151,7 +151,7 @@ public interface PendingAckHandle {
      *
      * @return the future of this operation.
      */
-    CompletableFuture<Void> close();
+    CompletableFuture<Void> closeAsync();
 
     /**
      * Check if the PendingAckStore is init.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
index fdff9f59146..0a4d201a87f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
 import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
 
 
@@ -131,15 +132,21 @@ public class MLPendingAckStoreProvider implements TransactionPendingAckStoreProv
                                         pendingAckStoreFuture.completeExceptionally(exception);
                                     }
                                 }, () -> true, null);
-            });
-        }).exceptionally(e -> {
-            log.error("Failed to obtain the existence of ManagerLedger with topic and subscription : "
-                    + originPersistentTopic.getSubscriptions() + "  "
-                    + subscription.getName());
-            pendingAckStoreFuture.completeExceptionally(
-                    e.getCause());
-            return null;
-        });
+                    }).exceptionally(e -> {
+                        Throwable t = FutureUtil.unwrapCompletionException(e);
+                        log.error("[{}] [{}] Failed to get managedLedger config when init pending ack store!",
+                                originPersistentTopic, subscription, t);
+                        pendingAckStoreFuture.completeExceptionally(t);
+                        return null;
+
+                    });
+                }).exceptionally(e -> {
+                    Throwable t = FutureUtil.unwrapCompletionException(e);
+                    log.error("[{}] [{}] Failed to check the pending ack topic exist when init pending ack store!",
+                            originPersistentTopic, subscription, t);
+                    pendingAckStoreFuture.completeExceptionally(t);
+                    return null;
+                });
         return pendingAckStoreFuture;
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
index e34628da887..e02502014e8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
@@ -93,7 +93,7 @@ public class PendingAckHandleDisabled implements PendingAckHandle {
     }
 
     @Override
-    public CompletableFuture<Void> close() {
+    public CompletableFuture<Void> closeAsync() {
         return CompletableFuture.completedFuture(null);
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 38d81d059b7..64652faa6e9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -153,9 +153,11 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
                         completeHandleFuture();
                     }
                 })
-                .exceptionally(t -> {
+                .exceptionally(e -> {
+                    Throwable t = FutureUtil.unwrapCompletionException(e);
                     changeToErrorState();
                     exceptionHandleFuture(t);
+                    this.pendingAckStoreFuture.completeExceptionally(t);
                     return null;
                 });
     }
@@ -168,13 +170,13 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
                 this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
                     recoverTime.setRecoverStartTime(System.currentTimeMillis());
                     pendingAckStore.replayAsync(this, internalPinnedExecutor);
-                }).exceptionally(e -> {
-                    acceptQueue.clear();
+                }).exceptionallyAsync(e -> {
+                    handleCacheRequest();
                     changeToErrorState();
                     log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e);
                     exceptionHandleFuture(e.getCause());
                     return null;
-                });
+                }, internalPinnedExecutor);
             }
         }
     }
@@ -927,9 +929,7 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
         if (!this.pendingAckHandleCompletableFuture.isDone()) {
             this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
         }
-        if (recoverTime.getRecoverStartTime() == 0L) {
-            return;
-        } else {
+        if (recoverTime.getRecoverStartTime() != 0L) {
             recoverTime.setRecoverEndTime(System.currentTimeMillis());
         }
     }
@@ -963,11 +963,29 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
     }
 
     @Override
-    public CompletableFuture<Void> close() {
+    public CompletableFuture<Void> closeAsync() {
         changeToCloseState();
         synchronized (PendingAckHandleImpl.this) {
             if (this.pendingAckStoreFuture != null) {
-                return this.pendingAckStoreFuture.thenAccept(PendingAckStore::closeAsync);
+                CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+                this.pendingAckStoreFuture.whenComplete((pendingAckStore, e) -> {
+                    if (e != null) {
+                        // init pending ack store fail, close don't need to
+                        // retry and throw exception, complete directly
+                        closeFuture.complete(null);
+                    } else {
+                        pendingAckStore.closeAsync().whenComplete((q, ex) -> {
+                            if (ex != null) {
+                                Throwable t = FutureUtil.unwrapCompletionException(ex);
+                                closeFuture.completeExceptionally(t);
+                            } else {
+                                closeFuture.complete(null);
+                            }
+                        });
+                    }
+                });
+
+                return closeFuture;
             } else {
                 return CompletableFuture.completedFuture(null);
             }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 76b91801a73..1043078d686 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1299,7 +1299,7 @@ public class TransactionTest extends TransactionTestBase {
             public Object answer(InvocationOnMock invocation) throws Throwable {
                 executorService.execute(()->{
                     PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl) invocation.getArguments()[0];
-                    pendingAckHandle.close();
+                    pendingAckHandle.closeAsync();
                     MLPendingAckReplyCallBack mlPendingAckReplyCallBack
                             = new MLPendingAckReplyCallBack(pendingAckHandle);
                     mlPendingAckReplyCallBack.replayComplete();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index e75c9534c59..dc5266d8869 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -21,7 +21,10 @@ package org.apache.pulsar.broker.transaction.pendingack;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.testng.AssertJUnit.assertNotNull;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
@@ -36,6 +39,9 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.pulsar.broker.service.AbstractTopic;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
@@ -58,6 +64,7 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -703,4 +710,54 @@ public class PendingAckPersistentTest extends TransactionTestBase {
         patternConsumer.close();
         producer.close();
     }
+
+    @Test
+    public void testGetManagedLegerConfigFailThenUnload() throws Exception {
+        String topic = TopicName.get(TopicDomain.persistent.toString(),
+                NamespaceName.get(NAMESPACE1), "testGetManagedLegerConfigFailThenUnload").toString();
+
+        String subscriptionName = "sub";
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false)
+                .topic(topic).create();
+
+        PersistentTopic persistentTopic =
+                (PersistentTopic) getPulsarServiceList()
+                        .get(0)
+                        .getBrokerService()
+                        .getTopic(topic, false)
+                        .get().orElse(null);
+
+        assertNotNull(persistentTopic);
+        BrokerService brokerService = spy(persistentTopic.getBrokerService());
+        doReturn(FutureUtil.failedFuture(new BrokerServiceException.ServiceUnitNotReadyException("test")))
+                .when(brokerService).getManagedLedgerConfig(any());
+        Field field = AbstractTopic.class.getDeclaredField("brokerService");
+        field.setAccessible(true);
+        field.set(persistentTopic, brokerService);
+
+        // init pending ack store
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .topic(topic)
+                .subscribe();
+
+        producer.send("test");
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(30, TimeUnit.SECONDS).build().get();
+
+        // pending ack init fail, so the ack will throw exception
+        try {
+            consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get();
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof PulsarClientException.LookupException);
+        }
+
+        // can unload success
+        admin.topics().unload(topic);
+    }
 }