You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/06/30 14:33:24 UTC
[pulsar] branch master updated: [fix] [transaction] Cmd-Subscribe and Cmd-Producer will not succeed even after 100 retries (#16248)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ac7331e06e5 [fix] [transaction] Cmd-Subscribe and Cmd-Producer will not succeed even after 100 retries (#16248)
ac7331e06e5 is described below
commit ac7331e06e597851a75e544130c0602a780a62cd
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Thu Jun 30 22:33:17 2022 +0800
[fix] [transaction] Cmd-Subscribe and Cmd-Producer will not succeed even after 100 retries (#16248)
### Motivation
E.g. `client.lookup(by producer)`, `topic.unload`, `consumer.subscribe` executed at the same time:
| Time | `client.lookup(by producer)` | `topic.unload` | `consumer.subscribe` |
| ----------- | ----------- | ----------- | ----------- |
| 1 | | | `ServerCnx.consumers.putIfAbsent(consumerId, consumerFuture)` |
| 2 | get existing persistent topic | | |
| 3 | create a new persistent subscription | | |
| 4 | create a new pending ack handle | | |
| 5 | repaly async | | |
| 6 | | | waiting for pending ack log repaly finish |
| 6 | | topic.close | |
| 7 | | async close subscription | |
| 8 | | change pending ack handle state --> `close` | |
| 9 | change pending ack handle state `init` --> `ready` | | |
| 10 | change state failure | | |
| 11 | | | subscribe timeout |
| 12 | | | retry subscribe |
| 13 | | | get exists `consumerFuture` in `ServerCnx.consumers` |
| 14 | | | waiting for pending ack log repaly finish |
| 15 | | | subscribe timeout |
| 16 | | | ...... (loop step12 ~ step15) |
Step 6/14: `PersistentSubscription.addConsumer` will waiting for pending ack replay done.
https://github.com/apache/pulsar/blob/ebcc47ee7ceb43f680640ad72e51a06d9856458d/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L205-L206
step 7: Because `PersistentSubscription.addConsumer` has not finish, no consumers are closed.
Step 10: When failure to modify the pending-ack-handle-state will not terminate the `pendingAckHandleFuture`
https://github.com/apache/pulsar/blob/ebcc47ee7ceb43f680640ad72e51a06d9856458d/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java#L46-L61
After step 11: Cmd-Subscribe will not succeed even after 100 retries.
### Modifications
When failure to modify the pending-ack-handle-state, make `pendingAckHandleFuture` exceptionally complete.
---
.../buffer/impl/TopicTransactionBuffer.java | 11 +-
.../pendingack/impl/MLPendingAckReplyCallBack.java | 7 +-
.../pulsar/broker/transaction/TransactionTest.java | 173 +++++++++++++++++++++
3 files changed, 187 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 39855106cc6..ccd6bc2b5ea 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.transaction.buffer.impl;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.util.Timeout;
import io.netty.util.Timer;
@@ -131,7 +132,12 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
}
if (!changeToReadyState()) {
- log.error("[{}]Transaction buffer recover fail", topic.getName());
+ log.error("[{}]Transaction buffer recover fail, current state: {}",
+ topic.getName(), getState());
+ transactionBufferFuture.completeExceptionally
+ (new BrokerServiceException.ServiceUnitNotReadyException(
+ "Transaction buffer recover failed to change the status to Ready,"
+ + "current state is: " + getState()));
} else {
timer.newTimeout(TopicTransactionBuffer.this,
takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
@@ -563,7 +569,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
// we store the maxReadPosition from snapshot then open the non-durable cursor by this topic's manageLedger.
// the non-durable cursor will read to lastConfirmedEntry.
- static class TopicTransactionBufferRecover implements Runnable {
+ @VisibleForTesting
+ public static class TopicTransactionBufferRecover implements Runnable {
private final PersistentTopic topic;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
index 728e7f0476b..53de549f69f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckReplyCallBack;
import org.apache.pulsar.broker.transaction.pendingack.proto.PendingAckMetadata;
import org.apache.pulsar.broker.transaction.pendingack.proto.PendingAckMetadataEntry;
@@ -53,8 +54,10 @@ public class MLPendingAckReplyCallBack implements PendingAckReplyCallBack {
log.info("Topic name : [{}], SubName : [{}] pending ack handle cache request success!",
pendingAckHandle.getTopicName(), pendingAckHandle.getSubName());
} else {
- log.error("Topic name : [{}], SubName : [{}] pending ack state reply fail!",
- pendingAckHandle.getTopicName(), pendingAckHandle.getSubName());
+ log.error("Topic name : [{}], SubName : [{}] pending ack state reply fail! current state: {}",
+ pendingAckHandle.getTopicName(), pendingAckHandle.getSubName(), pendingAckHandle.state);
+ replayFailed(new BrokerServiceException.ServiceUnitNotReadyException("Failed"
+ + " to change PendingAckHandle state to Ready, current state is : " + pendingAckHandle.state));
}
pendingAckHandle.handleCacheRequest();
});
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index edc16b57910..e95a811528c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -26,8 +26,10 @@ import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@@ -36,6 +38,7 @@ import io.netty.buffer.Unpooled;
import io.netty.util.Timeout;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
@@ -45,9 +48,13 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.Bytes;
@@ -61,18 +68,27 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
+import org.apache.pulsar.broker.service.BacklogQuotaManager;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferProvider;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferRecoverCallBack;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
+import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckReplyCallBack;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
@@ -91,6 +107,7 @@ import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.events.EventType;
@@ -112,6 +129,8 @@ import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -1091,4 +1110,158 @@ public class TransactionTest extends TransactionTestBase {
// repeat ack the second message, can ack successful
consumer.acknowledgeAsync(messageId, txn3).get();
}
+
+ /**
+ * When change pending ack handle state failure, exceptionally complete cmd-subscribe.
+ * see: https://github.com/apache/pulsar/pull/16248.
+ */
+ @Test
+ public void testPendingAckReplayChangeStateError() throws InterruptedException, TimeoutException {
+ AtomicInteger atomicInteger = new AtomicInteger(1);
+ // Create Executor
+ ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ // Mock serviceConfiguration.
+ ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class);
+ when(serviceConfiguration.isTransactionCoordinatorEnabled()).thenReturn(true);
+ // Mock executorProvider.
+ ExecutorProvider executorProvider = mock(ExecutorProvider.class);
+ when(executorProvider.getExecutor()).thenReturn(executorService);
+ when(executorProvider.getExecutor(any(Object.class))).thenReturn(executorService);
+ // Mock pendingAckStore.
+ PendingAckStore pendingAckStore = mock(PendingAckStore.class);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ executorService.execute(()->{
+ PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl) invocation.getArguments()[0];
+ pendingAckHandle.close();
+ MLPendingAckReplyCallBack mlPendingAckReplyCallBack
+ = new MLPendingAckReplyCallBack(pendingAckHandle);
+ mlPendingAckReplyCallBack.replayComplete();
+ });
+ return null;
+ }
+ }).when(pendingAckStore).replayAsync(any(), any());
+ // Mock executorProvider.
+ TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class);
+ when(pendingAckStoreProvider.checkInitializedBefore(any()))
+ .thenReturn(CompletableFuture.completedFuture(true));
+ when(pendingAckStoreProvider.newPendingAckStore(any()))
+ .thenReturn(CompletableFuture.completedFuture(pendingAckStore));
+ // Mock pulsar.
+ PulsarService pulsar = mock(PulsarService.class);
+ when(pulsar.getConfig()).thenReturn(serviceConfiguration);
+ when(pulsar.getTransactionExecutorProvider()).thenReturn(executorProvider);
+ when(pulsar.getTransactionPendingAckStoreProvider()).thenReturn(pendingAckStoreProvider);
+ // Mock brokerService.
+ BrokerService brokerService = mock(BrokerService.class);
+ when(brokerService.getPulsar()).thenReturn(pulsar);
+ // Mock topic.
+ PersistentTopic topic = mock(PersistentTopic.class);
+ when(topic.getBrokerService()).thenReturn(brokerService);
+ when(topic.getName()).thenReturn("topic-a");
+ // Mock cursor for subscription.
+ ManagedCursor cursor_subscription = mock(ManagedCursor.class);
+ doThrow(new RuntimeException("1")).when(cursor_subscription).updateLastActive();
+ // Create subscription.
+ String subscriptionName = "sub-a";
+ boolean replicated = false;
+ Map<String, String> subscriptionProperties = Collections.emptyMap();
+ PersistentSubscription persistentSubscription = new PersistentSubscription(topic, subscriptionName,
+ cursor_subscription, replicated, subscriptionProperties);
+ org.apache.pulsar.broker.service.Consumer consumer = mock(org.apache.pulsar.broker.service.Consumer.class);
+ try {
+ CompletableFuture<Void> addConsumerFuture = persistentSubscription.addConsumer(consumer);
+ addConsumerFuture.get(5, TimeUnit.SECONDS);
+ fail("Expect failure by PendingAckHandle closed, but success");
+ } catch (ExecutionException executionException){
+ Throwable t = executionException.getCause();
+ Assert.assertTrue(t instanceof BrokerServiceException.ServiceUnitNotReadyException);
+ }
+ }
+
+ /**
+ * When change TB state failure, exceptionally complete cmd-producer.
+ * see: https://github.com/apache/pulsar/pull/16248.
+ */
+ @Test
+ public void testTBRecoverChangeStateError() throws InterruptedException, TimeoutException {
+ final AtomicReference<PersistentTopic> persistentTopic = new AtomicReference<PersistentTopic>();
+ AtomicInteger atomicInteger = new AtomicInteger(1);
+ // Create Executor
+ ScheduledExecutorService executorService_recover = mock(ScheduledExecutorService.class);
+ // Mock serviceConfiguration.
+ ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class);
+ when(serviceConfiguration.isEnableReplicatedSubscriptions()).thenReturn(false);
+ when(serviceConfiguration.isTransactionCoordinatorEnabled()).thenReturn(true);
+ // Mock executorProvider.
+ ExecutorProvider executorProvider = mock(ExecutorProvider.class);
+ when(executorProvider.getExecutor(any(Object.class))).thenReturn(executorService_recover);
+ // Mock pendingAckStore.
+ PendingAckStore pendingAckStore = mock(PendingAckStore.class);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ new Thread(() -> {
+ TopicTransactionBuffer.TopicTransactionBufferRecover recover
+ = (TopicTransactionBuffer.TopicTransactionBufferRecover)invocation.getArguments()[0];
+ TopicTransactionBufferRecoverCallBack callBack
+ = Whitebox.getInternalState(recover, "callBack");;
+ try {
+ persistentTopic.get().getTransactionBuffer().closeAsync().get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ callBack.recoverComplete();
+ }).start();
+ return null;
+ }
+ }).when(executorService_recover).execute(any());
+ // Mock executorProvider.
+ TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class);
+ when(pendingAckStoreProvider.checkInitializedBefore(any()))
+ .thenReturn(CompletableFuture.completedFuture(true));
+ when(pendingAckStoreProvider.newPendingAckStore(any()))
+ .thenReturn(CompletableFuture.completedFuture(pendingAckStore));
+ // Mock TransactionBufferSnapshotService
+ TransactionBufferSnapshotService transactionBufferSnapshotService
+ = mock(TransactionBufferSnapshotService.class);
+ SystemTopicClient.Writer writer = mock(SystemTopicClient.Writer.class);
+ when(writer.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+ when(transactionBufferSnapshotService.createWriter(any()))
+ .thenReturn(CompletableFuture.completedFuture(writer));
+ // Mock pulsar.
+ PulsarService pulsar = mock(PulsarService.class);
+ when(pulsar.getConfiguration()).thenReturn(serviceConfiguration);
+ when(pulsar.getConfig()).thenReturn(serviceConfiguration);
+ when(pulsar.getTransactionExecutorProvider()).thenReturn(executorProvider);
+ when(pulsar.getTransactionBufferSnapshotService()).thenReturn(transactionBufferSnapshotService);
+ TopicTransactionBufferProvider topicTransactionBufferProvider = new TopicTransactionBufferProvider();
+ when(pulsar.getTransactionBufferProvider()).thenReturn(topicTransactionBufferProvider);
+ // Mock BacklogQuotaManager
+ BacklogQuotaManager backlogQuotaManager = mock(BacklogQuotaManager.class);
+ // Mock brokerService.
+ BrokerService brokerService = mock(BrokerService.class);
+ when(brokerService.getPulsar()).thenReturn(pulsar);
+ when(brokerService.pulsar()).thenReturn(pulsar);
+ when(brokerService.getBacklogQuotaManager()).thenReturn(backlogQuotaManager);
+ // Mock managedLedger.
+ ManagedLedgerImpl managedLedger = mock(ManagedLedgerImpl.class);
+ ManagedCursorContainer managedCursors = new ManagedCursorContainer();
+ when(managedLedger.getCursors()).thenReturn(managedCursors);
+ PositionImpl position = PositionImpl.EARLIEST;
+ when(managedLedger.getLastConfirmedEntry()).thenReturn(position);
+ // Create topic.
+ persistentTopic.set(new PersistentTopic("topic-a", managedLedger, brokerService));
+ try {
+ // Do check.
+ persistentTopic.get().checkIfTransactionBufferRecoverCompletely(true).get(5, TimeUnit.SECONDS);
+ fail("Expect failure by TB closed, but it is finished.");
+ } catch (ExecutionException executionException){
+ Throwable t = executionException.getCause();
+ Assert.assertTrue(t instanceof BrokerServiceException.ServiceUnitNotReadyException);
+ }
+ }
}