You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ba...@apache.org on 2024/04/15 00:58:36 UTC

(pulsar) branch branch-3.1 updated: [fix][txn]Handle exceptions in the transaction pending ack init (#21274)

This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new ce99a96bb5b [fix][txn]Handle exceptions in the transaction pending ack init (#21274)
ce99a96bb5b is described below

commit ce99a96bb5b088dd37dc2ecb97ef7bb9d3ae3deb
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Mon Apr 15 08:43:12 2024 +0800

    [fix][txn]Handle exceptions in the transaction pending ack init (#21274)
    
    Co-authored-by: Baodi Shi <ba...@apache.org>
    (cherry picked from commit 5d18ff7b70f9de3b95d83f6a8fd4756b1c34567b)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  2 +-
 .../pendingack/impl/PendingAckHandleImpl.java      | 54 ++++++++++++--
 .../pulsar/broker/transaction/TransactionTest.java |  2 +-
 .../pendingack/PendingAckPersistentTest.java       | 82 ++++++++++++++++++++++
 4 files changed, 132 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 29ba7cb866e..c5342dd3dff 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1311,7 +1311,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                             // Send error back to client, only if not completed already.
                             if (consumerFuture.completeExceptionally(exception)) {
                                 commandSender.sendErrorResponse(requestId,
-                                        BrokerServiceException.getClientErrorCode(exception),
+                                        BrokerServiceException.getClientErrorCode(exception.getCause()),
                                         exception.getCause().getMessage());
                             }
                             consumers.remove(consumerId, consumerFuture);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 7dbe0385fd7..5ed271c6fd4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -22,6 +22,7 @@ import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
 import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet;
 import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap;
 import com.google.common.annotations.VisibleForTesting;
+import io.netty.util.Timer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -35,9 +36,11 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -45,6 +48,7 @@ import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.service.Consumer;
@@ -53,7 +57,9 @@ import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
 import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
@@ -134,6 +140,12 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
 
     public final RecoverTimeRecord recoverTime = new RecoverTimeRecord();
 
+    private final long pendingAckInitFailureBackoffInitialTimeInMs = 100;
+
+    public final Backoff backoff = new Backoff(pendingAckInitFailureBackoffInitialTimeInMs, TimeUnit.MILLISECONDS,
+            1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
+
+    private final Timer transactionOpTimer;
 
     public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
         super(State.None);
@@ -153,7 +165,11 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
 
         this.pendingAckStoreProvider = this.persistentSubscription.getTopic()
                         .getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
+        transactionOpTimer = persistentSubscription.getTopic().getBrokerService().getPulsar().getTransactionTimer();
+        init();
+    }
 
+    private void init() {
         pendingAckStoreProvider.checkInitializedBefore(persistentSubscription)
                 .thenAcceptAsync(init -> {
                     if (init) {
@@ -164,9 +180,9 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
                 }, internalPinnedExecutor)
                 .exceptionallyAsync(e -> {
                     Throwable t = FutureUtil.unwrapCompletionException(e);
-                    changeToErrorState();
+                    // Handling the exceptions in `exceptionHandleFuture`,
+                    // it will be helpful to make the exception handling clearer.
                     exceptionHandleFuture(t);
-                    this.pendingAckStoreFuture.completeExceptionally(t);
                     return null;
                 }, internalPinnedExecutor);
     }
@@ -180,9 +196,8 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
                     recoverTime.setRecoverStartTime(System.currentTimeMillis());
                     pendingAckStore.replayAsync(this, internalPinnedExecutor);
                 }).exceptionallyAsync(e -> {
-                    handleCacheRequest();
-                    changeToErrorState();
-                    log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e);
+                    // Handling the exceptions in `exceptionHandleFuture`,
+                    // it will be helpful to make the exception handling clearer.
                     exceptionHandleFuture(e.getCause());
                     return null;
                 }, internalPinnedExecutor);
@@ -945,12 +960,39 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
     }
 
     public void exceptionHandleFuture(Throwable t) {
-        final boolean completedNow = this.pendingAckHandleCompletableFuture.completeExceptionally(t);
+        if (isRetryableException(t)) {
+            this.state = State.None;
+            long retryTime = backoff.next();
+            log.warn("[{}][{}] Failed to init transaction pending ack. It will be retried in {} Ms",
+                    persistentSubscription.getTopic().getName(), subName, retryTime, t);
+            transactionOpTimer.newTimeout((timeout) -> init(), retryTime, TimeUnit.MILLISECONDS);
+            return;
+        }
+        log.error("[{}] [{}] PendingAckHandleImpl init fail!", topicName, subName, t);
+        handleCacheRequest();
+        changeToErrorState();
+        // ToDo: Add a new serverError `TransactionComponentLoadFailedException`
+        //  and before that a `Unknown` will be returned first.
+        this.pendingAckStoreFuture = FutureUtil.failedFuture(new BrokerServiceException(
+                        String.format("[%s][%s] Failed to init transaction pending ack.", topicName, subName)));
+        final boolean completedNow = this.pendingAckHandleCompletableFuture.completeExceptionally(
+                new BrokerServiceException(
+                String.format("[%s][%s] Failed to init transaction pending ack.", topicName, subName)));
         if (completedNow) {
             recoverTime.setRecoverEndTime(System.currentTimeMillis());
         }
     }
 
+    private static boolean isRetryableException(Throwable ex) {
+        Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+        return (realCause instanceof ManagedLedgerException
+                && !(realCause instanceof ManagedLedgerException.ManagedLedgerFencedException)
+                && !(realCause instanceof ManagedLedgerException.NonRecoverableLedgerException))
+                || realCause instanceof PulsarClientException.BrokerPersistenceException
+                || realCause instanceof PulsarClientException.LookupException
+                || realCause instanceof PulsarClientException.ConnectException;
+    }
+
     @Override
     public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID) {
         TransactionInPendingAckStats transactionInPendingAckStats = new TransactionInPendingAckStats();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index c7e545618db..dd6466e8d3b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1505,7 +1505,7 @@ public class TransactionTest extends TransactionTestBase {
             fail("Expect failure by PendingAckHandle closed, but success");
         } catch (ExecutionException executionException){
             Throwable t = executionException.getCause();
-            Assert.assertTrue(t instanceof BrokerServiceException.ServiceUnitNotReadyException);
+            Assert.assertTrue(t instanceof BrokerServiceException);
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index bc537fb784f..2e154715ac9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -21,7 +21,9 @@ package org.apache.pulsar.broker.transaction.pendingack;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 import static org.testng.AssertJUnit.assertFalse;
 import static org.testng.AssertJUnit.assertNotNull;
 import static org.testng.AssertJUnit.assertTrue;
@@ -42,8 +44,10 @@ import com.google.common.collect.Multimap;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -58,6 +62,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -99,6 +104,83 @@ public class PendingAckPersistentTest extends TransactionTestBase {
         super.internalCleanup();
     }
 
+    /**
+     * Test consumer can be built successfully with retryable exception
+     * and get correct error with no-retryable exception.
+     * @throws Exception
+     */
+    @Test(timeOut = 60000)
+    public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception {
+        // 1. Prepare and make sure the consumer can be built successfully.
+        String topic = NAMESPACE1 + "/testUnloadSubscriptionWhenFailedInitPendingAck";
+        @Cleanup
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .subscriptionName("subName1")
+                .topic(topic)
+                .subscribe();
+        // 2. Mock a transactionPendingAckStoreProvider to test building consumer
+        // failing at transactionPendingAckStoreProvider::checkInitializedBefore.
+        Field transactionPendingAckStoreProviderField = PulsarService.class
+                .getDeclaredField("transactionPendingAckStoreProvider");
+        transactionPendingAckStoreProviderField.setAccessible(true);
+        TransactionPendingAckStoreProvider pendingAckStoreProvider =
+                (TransactionPendingAckStoreProvider) transactionPendingAckStoreProviderField
+                        .get(pulsarServiceList.get(0));
+        TransactionPendingAckStoreProvider mockProvider = mock(pendingAckStoreProvider.getClass());
+        // 3. Test retryable exception when checkInitializedBefore:
+        // The consumer will be built successfully after one time retry.
+        when(mockProvider.checkInitializedBefore(any()))
+                // First, the method checkInitializedBefore will fail with a retryable exception.
+                .thenReturn(FutureUtil.failedFuture(new ManagedLedgerException("mock fail initialize")))
+                // Then, the method will be executed successfully.
+                .thenReturn(CompletableFuture.completedFuture(false));
+        transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), mockProvider);
+        @Cleanup
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+                .subscriptionName("subName2")
+                .topic(topic)
+                .subscribe();
+
+        // 4. Test retryable exception when newPendingAckStore:
+        // The consumer will be built successfully after one time retry.
+        when(mockProvider.checkInitializedBefore(any()))
+                .thenReturn(CompletableFuture.completedFuture(true));
+
+        when(mockProvider.newPendingAckStore(any()))
+                // First, the method newPendingAckStore will fail with a retryable exception.
+                .thenReturn(FutureUtil.failedFuture(new ManagedLedgerException("mock fail new store")))
+                // Then, the method will be executed successfully.
+                .thenCallRealMethod();
+        transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), mockProvider);
+        @Cleanup
+        Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
+                .subscriptionName("subName3")
+                .topic(topic)
+                .subscribe();
+
+        // 5. Test no-retryable exception:
+        // The consumer building will be failed without retrying.
+        when(mockProvider.checkInitializedBefore(any()))
+                // The method checkInitializedBefore will fail with a no-retryable exception without retrying.
+                .thenReturn(FutureUtil.failedFuture(new ManagedLedgerException
+                        .NonRecoverableLedgerException("mock fail")))
+                .thenReturn(CompletableFuture.completedFuture(false));
+        @Cleanup PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .build();
+        try {
+            @Cleanup
+            Consumer<byte[]> consumer4 = pulsarClient.newConsumer()
+                    .subscriptionName("subName4")
+                    .topic(topic)
+                    .subscribe();
+            fail();
+        } catch (Exception exception) {
+            assertTrue(exception.getMessage().contains("Failed to init transaction pending ack."));
+        }
+    }
+
     @Test
     public void individualPendingAckReplayTest() throws Exception {
         int messageCount = 1000;