You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ba...@apache.org on 2024/04/15 00:58:36 UTC
(pulsar) branch branch-3.1 updated: [fix][txn]Handle exceptions in the transaction pending ack init (#21274)
This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new ce99a96bb5b [fix][txn]Handle exceptions in the transaction pending ack init (#21274)
ce99a96bb5b is described below
commit ce99a96bb5b088dd37dc2ecb97ef7bb9d3ae3deb
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Mon Apr 15 08:43:12 2024 +0800
[fix][txn]Handle exceptions in the transaction pending ack init (#21274)
Co-authored-by: Baodi Shi <ba...@apache.org>
(cherry picked from commit 5d18ff7b70f9de3b95d83f6a8fd4756b1c34567b)
---
.../apache/pulsar/broker/service/ServerCnx.java | 2 +-
.../pendingack/impl/PendingAckHandleImpl.java | 54 ++++++++++++--
.../pulsar/broker/transaction/TransactionTest.java | 2 +-
.../pendingack/PendingAckPersistentTest.java | 82 ++++++++++++++++++++++
4 files changed, 132 insertions(+), 8 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 29ba7cb866e..c5342dd3dff 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1311,7 +1311,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
// Send error back to client, only if not completed already.
if (consumerFuture.completeExceptionally(exception)) {
commandSender.sendErrorResponse(requestId,
- BrokerServiceException.getClientErrorCode(exception),
+ BrokerServiceException.getClientErrorCode(exception.getCause()),
exception.getCause().getMessage());
}
consumers.remove(consumerId, consumerFuture);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 7dbe0385fd7..5ed271c6fd4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -22,6 +22,7 @@ import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet;
import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap;
import com.google.common.annotations.VisibleForTesting;
+import io.netty.util.Timer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -35,9 +36,11 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -45,6 +48,7 @@ import org.apache.commons.collections4.map.LinkedMap;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.Consumer;
@@ -53,7 +57,9 @@ import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
@@ -134,6 +140,12 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
public final RecoverTimeRecord recoverTime = new RecoverTimeRecord();
+ private final long pendingAckInitFailureBackoffInitialTimeInMs = 100;
+
+ public final Backoff backoff = new Backoff(pendingAckInitFailureBackoffInitialTimeInMs, TimeUnit.MILLISECONDS,
+ 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
+
+ private final Timer transactionOpTimer;
public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
super(State.None);
@@ -153,7 +165,11 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
this.pendingAckStoreProvider = this.persistentSubscription.getTopic()
.getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
+ transactionOpTimer = persistentSubscription.getTopic().getBrokerService().getPulsar().getTransactionTimer();
+ init();
+ }
+ private void init() {
pendingAckStoreProvider.checkInitializedBefore(persistentSubscription)
.thenAcceptAsync(init -> {
if (init) {
@@ -164,9 +180,9 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
}, internalPinnedExecutor)
.exceptionallyAsync(e -> {
Throwable t = FutureUtil.unwrapCompletionException(e);
- changeToErrorState();
+ // Handling the exceptions in `exceptionHandleFuture`,
+ // it will be helpful to make the exception handling clearer.
exceptionHandleFuture(t);
- this.pendingAckStoreFuture.completeExceptionally(t);
return null;
}, internalPinnedExecutor);
}
@@ -180,9 +196,8 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
recoverTime.setRecoverStartTime(System.currentTimeMillis());
pendingAckStore.replayAsync(this, internalPinnedExecutor);
}).exceptionallyAsync(e -> {
- handleCacheRequest();
- changeToErrorState();
- log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e);
+ // Handling the exceptions in `exceptionHandleFuture`,
+ // it will be helpful to make the exception handling clearer.
exceptionHandleFuture(e.getCause());
return null;
}, internalPinnedExecutor);
@@ -945,12 +960,39 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
}
public void exceptionHandleFuture(Throwable t) {
- final boolean completedNow = this.pendingAckHandleCompletableFuture.completeExceptionally(t);
+ if (isRetryableException(t)) {
+ this.state = State.None;
+ long retryTime = backoff.next();
+ log.warn("[{}][{}] Failed to init transaction pending ack. It will be retried in {} Ms",
+ persistentSubscription.getTopic().getName(), subName, retryTime, t);
+ transactionOpTimer.newTimeout((timeout) -> init(), retryTime, TimeUnit.MILLISECONDS);
+ return;
+ }
+ log.error("[{}] [{}] PendingAckHandleImpl init fail!", topicName, subName, t);
+ handleCacheRequest();
+ changeToErrorState();
+ // ToDo: Add a new serverError `TransactionComponentLoadFailedException`
+ // and before that a `Unknown` will be returned first.
+ this.pendingAckStoreFuture = FutureUtil.failedFuture(new BrokerServiceException(
+ String.format("[%s][%s] Failed to init transaction pending ack.", topicName, subName)));
+ final boolean completedNow = this.pendingAckHandleCompletableFuture.completeExceptionally(
+ new BrokerServiceException(
+ String.format("[%s][%s] Failed to init transaction pending ack.", topicName, subName)));
if (completedNow) {
recoverTime.setRecoverEndTime(System.currentTimeMillis());
}
}
+ private static boolean isRetryableException(Throwable ex) {
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ return (realCause instanceof ManagedLedgerException
+ && !(realCause instanceof ManagedLedgerException.ManagedLedgerFencedException)
+ && !(realCause instanceof ManagedLedgerException.NonRecoverableLedgerException))
+ || realCause instanceof PulsarClientException.BrokerPersistenceException
+ || realCause instanceof PulsarClientException.LookupException
+ || realCause instanceof PulsarClientException.ConnectException;
+ }
+
@Override
public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID) {
TransactionInPendingAckStats transactionInPendingAckStats = new TransactionInPendingAckStats();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index c7e545618db..dd6466e8d3b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1505,7 +1505,7 @@ public class TransactionTest extends TransactionTestBase {
fail("Expect failure by PendingAckHandle closed, but success");
} catch (ExecutionException executionException){
Throwable t = executionException.getCause();
- Assert.assertTrue(t instanceof BrokerServiceException.ServiceUnitNotReadyException);
+ Assert.assertTrue(t instanceof BrokerServiceException);
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index bc537fb784f..2e154715ac9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -21,7 +21,9 @@ package org.apache.pulsar.broker.transaction.pendingack;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertTrue;
@@ -42,8 +44,10 @@ import com.google.common.collect.Multimap;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -58,6 +62,7 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
@@ -99,6 +104,83 @@ public class PendingAckPersistentTest extends TransactionTestBase {
super.internalCleanup();
}
+ /**
+ * Test consumer can be built successfully with retryable exception
+ * and get correct error with no-retryable exception.
+ * @throws Exception
+ */
+ @Test(timeOut = 60000)
+ public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception {
+ // 1. Prepare and make sure the consumer can be built successfully.
+ String topic = NAMESPACE1 + "/testUnloadSubscriptionWhenFailedInitPendingAck";
+ @Cleanup
+ Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+ .subscriptionName("subName1")
+ .topic(topic)
+ .subscribe();
+ // 2. Mock a transactionPendingAckStoreProvider to test building consumer
+ // failing at transactionPendingAckStoreProvider::checkInitializedBefore.
+ Field transactionPendingAckStoreProviderField = PulsarService.class
+ .getDeclaredField("transactionPendingAckStoreProvider");
+ transactionPendingAckStoreProviderField.setAccessible(true);
+ TransactionPendingAckStoreProvider pendingAckStoreProvider =
+ (TransactionPendingAckStoreProvider) transactionPendingAckStoreProviderField
+ .get(pulsarServiceList.get(0));
+ TransactionPendingAckStoreProvider mockProvider = mock(pendingAckStoreProvider.getClass());
+ // 3. Test retryable exception when checkInitializedBefore:
+ // The consumer will be built successfully after one time retry.
+ when(mockProvider.checkInitializedBefore(any()))
+ // First, the method checkInitializedBefore will fail with a retryable exception.
+ .thenReturn(FutureUtil.failedFuture(new ManagedLedgerException("mock fail initialize")))
+ // Then, the method will be executed successfully.
+ .thenReturn(CompletableFuture.completedFuture(false));
+ transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), mockProvider);
+ @Cleanup
+ Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+ .subscriptionName("subName2")
+ .topic(topic)
+ .subscribe();
+
+ // 4. Test retryable exception when newPendingAckStore:
+ // The consumer will be built successfully after one time retry.
+ when(mockProvider.checkInitializedBefore(any()))
+ .thenReturn(CompletableFuture.completedFuture(true));
+
+ when(mockProvider.newPendingAckStore(any()))
+ // First, the method newPendingAckStore will fail with a retryable exception.
+ .thenReturn(FutureUtil.failedFuture(new ManagedLedgerException("mock fail new store")))
+ // Then, the method will be executed successfully.
+ .thenCallRealMethod();
+ transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), mockProvider);
+ @Cleanup
+ Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
+ .subscriptionName("subName3")
+ .topic(topic)
+ .subscribe();
+
+ // 5. Test no-retryable exception:
+ // The consumer building will be failed without retrying.
+ when(mockProvider.checkInitializedBefore(any()))
+ // The method checkInitializedBefore will fail with a no-retryable exception without retrying.
+ .thenReturn(FutureUtil.failedFuture(new ManagedLedgerException
+ .NonRecoverableLedgerException("mock fail")))
+ .thenReturn(CompletableFuture.completedFuture(false));
+ @Cleanup PulsarClient pulsarClient = PulsarClient.builder()
+ .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+ .operationTimeout(3, TimeUnit.SECONDS)
+ .build();
+ try {
+ @Cleanup
+ Consumer<byte[]> consumer4 = pulsarClient.newConsumer()
+ .subscriptionName("subName4")
+ .topic(topic)
+ .subscribe();
+ fail();
+ } catch (Exception exception) {
+ assertTrue(exception.getMessage().contains("Failed to init transaction pending ack."));
+ }
+ }
+
@Test
public void individualPendingAckReplayTest() throws Exception {
int messageCount = 1000;