You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/06/30 14:33:24 UTC

[pulsar] branch master updated: [fix] [transaction] Cmd-Subscribe and Cmd-Producer will not succeed even after 100 retries (#16248)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ac7331e06e5 [fix] [transaction] Cmd-Subscribe and Cmd-Producer will not succeed even after 100 retries (#16248)
ac7331e06e5 is described below

commit ac7331e06e597851a75e544130c0602a780a62cd
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Thu Jun 30 22:33:17 2022 +0800

    [fix] [transaction] Cmd-Subscribe and Cmd-Producer will not succeed even after 100 retries (#16248)
    
    ### Motivation
    E.g. `client.lookup(by producer)`, `topic.unload`, `consumer.subscribe`  executed at the same time:
    
    | Time | `client.lookup(by producer)` | `topic.unload` | `consumer.subscribe` |
    | ----------- | ----------- | ----------- | ----------- |
    | 1 |  |  | `ServerCnx.consumers.putIfAbsent(consumerId, consumerFuture)` |
    | 2 | get existing persistent topic |  |  |
    | 3 | create a new persistent subscription |  |  |
    | 4 | create a new pending ack handle |  |  |
    | 5 | repaly async |  |  |
    | 6 |  |  | waiting for pending ack log repaly finish |
    | 6 |  | topic.close |  |
    | 7 |  | async close subscription |  |
    | 8 |  | change pending ack handle state  --> `close` |  |
    | 9 | change pending ack handle state `init` --> `ready` |  |  |
    | 10 | change state failure |  |  |
    | 11 |  |  | subscribe timeout |
    | 12 |  |  | retry subscribe |
    | 13 |  |  | get exists `consumerFuture` in `ServerCnx.consumers` |
    | 14 |  |  | waiting for pending ack log repaly finish |
    | 15 |  |  | subscribe timeout |
    | 16 |  |  | ......    (loop step12 ~ step15) |
    
    Step 6/14: `PersistentSubscription.addConsumer` will waiting for pending ack replay done.
    
    https://github.com/apache/pulsar/blob/ebcc47ee7ceb43f680640ad72e51a06d9856458d/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L205-L206
    
    step 7: Because `PersistentSubscription.addConsumer` has not finish, no consumers are closed.
    
    Step 10: When failure to modify the pending-ack-handle-state will not terminate the `pendingAckHandleFuture`
    
    https://github.com/apache/pulsar/blob/ebcc47ee7ceb43f680640ad72e51a06d9856458d/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java#L46-L61
    
    After step 11: Cmd-Subscribe will not succeed even after 100 retries.
    
    ### Modifications
    
    When failure to modify the pending-ack-handle-state, make `pendingAckHandleFuture` exceptionally complete.
---
 .../buffer/impl/TopicTransactionBuffer.java        |  11 +-
 .../pendingack/impl/MLPendingAckReplyCallBack.java |   7 +-
 .../pulsar/broker/transaction/TransactionTest.java | 173 +++++++++++++++++++++
 3 files changed, 187 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 39855106cc6..ccd6bc2b5ea 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.transaction.buffer.impl;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.Timeout;
 import io.netty.util.Timer;
@@ -131,7 +132,12 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                                 maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
                             }
                             if (!changeToReadyState()) {
-                                log.error("[{}]Transaction buffer recover fail", topic.getName());
+                                log.error("[{}]Transaction buffer recover fail, current state: {}",
+                                        topic.getName(), getState());
+                                transactionBufferFuture.completeExceptionally
+                                        (new BrokerServiceException.ServiceUnitNotReadyException(
+                                                "Transaction buffer recover failed to change the status to Ready,"
+                                                        + "current state is: " + getState()));
                             } else {
                                 timer.newTimeout(TopicTransactionBuffer.this,
                                         takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
@@ -563,7 +569,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
 
     // we store the maxReadPosition from snapshot then open the non-durable cursor by this topic's manageLedger.
     // the non-durable cursor will read to lastConfirmedEntry.
-    static class TopicTransactionBufferRecover implements Runnable {
+    @VisibleForTesting
+    public static class TopicTransactionBufferRecover implements Runnable {
 
         private final PersistentTopic topic;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
index 728e7f0476b..53de549f69f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckReplyCallBack;
 import org.apache.pulsar.broker.transaction.pendingack.proto.PendingAckMetadata;
 import org.apache.pulsar.broker.transaction.pendingack.proto.PendingAckMetadataEntry;
@@ -53,8 +54,10 @@ public class MLPendingAckReplyCallBack implements PendingAckReplyCallBack {
                 log.info("Topic name : [{}], SubName : [{}] pending ack handle cache request success!",
                         pendingAckHandle.getTopicName(), pendingAckHandle.getSubName());
             } else {
-                log.error("Topic name : [{}], SubName : [{}] pending ack state reply fail!",
-                        pendingAckHandle.getTopicName(), pendingAckHandle.getSubName());
+                log.error("Topic name : [{}], SubName : [{}] pending ack state reply fail! current state: {}",
+                        pendingAckHandle.getTopicName(), pendingAckHandle.getSubName(), pendingAckHandle.state);
+                replayFailed(new BrokerServiceException.ServiceUnitNotReadyException("Failed"
+                        + " to change PendingAckHandle state to Ready, current state is : " + pendingAckHandle.state));
             }
             pendingAckHandle.handleCacheRequest();
         });
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index edc16b57910..e95a811528c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -26,8 +26,10 @@ import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
@@ -36,6 +38,7 @@ import io.netty.buffer.Unpooled;
 import io.netty.util.Timeout;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.List;
@@ -45,9 +48,13 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.util.Bytes;
@@ -61,18 +68,27 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
+import org.apache.pulsar.broker.service.BacklogQuotaManager;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferProvider;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferRecoverCallBack;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
 import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
 import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
+import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckReplyCallBack;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
 import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
@@ -91,6 +107,7 @@ import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.events.EventType;
@@ -112,6 +129,8 @@ import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.powermock.reflect.Whitebox;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -1091,4 +1110,158 @@ public class TransactionTest extends TransactionTestBase {
         // repeat ack the second message, can ack successful
         consumer.acknowledgeAsync(messageId, txn3).get();
     }
+
+    /**
+     * When change pending ack handle state failure, exceptionally complete cmd-subscribe.
+     * see: https://github.com/apache/pulsar/pull/16248.
+     */
+    @Test
+    public void testPendingAckReplayChangeStateError() throws InterruptedException, TimeoutException {
+        AtomicInteger atomicInteger = new AtomicInteger(1);
+        // Create Executor
+        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+        // Mock serviceConfiguration.
+        ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class);
+        when(serviceConfiguration.isTransactionCoordinatorEnabled()).thenReturn(true);
+        // Mock executorProvider.
+        ExecutorProvider executorProvider = mock(ExecutorProvider.class);
+        when(executorProvider.getExecutor()).thenReturn(executorService);
+        when(executorProvider.getExecutor(any(Object.class))).thenReturn(executorService);
+        // Mock pendingAckStore.
+        PendingAckStore pendingAckStore = mock(PendingAckStore.class);
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                executorService.execute(()->{
+                    PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl) invocation.getArguments()[0];
+                    pendingAckHandle.close();
+                    MLPendingAckReplyCallBack mlPendingAckReplyCallBack
+                            = new MLPendingAckReplyCallBack(pendingAckHandle);
+                    mlPendingAckReplyCallBack.replayComplete();
+                });
+                return null;
+            }
+        }).when(pendingAckStore).replayAsync(any(), any());
+        // Mock executorProvider.
+        TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class);
+        when(pendingAckStoreProvider.checkInitializedBefore(any()))
+                .thenReturn(CompletableFuture.completedFuture(true));
+        when(pendingAckStoreProvider.newPendingAckStore(any()))
+                .thenReturn(CompletableFuture.completedFuture(pendingAckStore));
+        // Mock pulsar.
+        PulsarService pulsar = mock(PulsarService.class);
+        when(pulsar.getConfig()).thenReturn(serviceConfiguration);
+        when(pulsar.getTransactionExecutorProvider()).thenReturn(executorProvider);
+        when(pulsar.getTransactionPendingAckStoreProvider()).thenReturn(pendingAckStoreProvider);
+        // Mock brokerService.
+        BrokerService brokerService = mock(BrokerService.class);
+        when(brokerService.getPulsar()).thenReturn(pulsar);
+        // Mock topic.
+        PersistentTopic topic = mock(PersistentTopic.class);
+        when(topic.getBrokerService()).thenReturn(brokerService);
+        when(topic.getName()).thenReturn("topic-a");
+        // Mock cursor for subscription.
+        ManagedCursor cursor_subscription = mock(ManagedCursor.class);
+        doThrow(new RuntimeException("1")).when(cursor_subscription).updateLastActive();
+        // Create subscription.
+        String subscriptionName = "sub-a";
+        boolean replicated = false;
+        Map<String, String> subscriptionProperties = Collections.emptyMap();
+        PersistentSubscription persistentSubscription = new PersistentSubscription(topic, subscriptionName,
+                cursor_subscription, replicated, subscriptionProperties);
+        org.apache.pulsar.broker.service.Consumer consumer = mock(org.apache.pulsar.broker.service.Consumer.class);
+        try {
+            CompletableFuture<Void> addConsumerFuture = persistentSubscription.addConsumer(consumer);
+            addConsumerFuture.get(5, TimeUnit.SECONDS);
+            fail("Expect failure by PendingAckHandle closed, but success");
+        } catch (ExecutionException executionException){
+            Throwable t = executionException.getCause();
+            Assert.assertTrue(t instanceof BrokerServiceException.ServiceUnitNotReadyException);
+        }
+    }
+
+    /**
+     * When change TB state failure, exceptionally complete cmd-producer.
+     * see: https://github.com/apache/pulsar/pull/16248.
+     */
+    @Test
+    public void testTBRecoverChangeStateError() throws InterruptedException, TimeoutException {
+        final AtomicReference<PersistentTopic> persistentTopic = new AtomicReference<PersistentTopic>();
+        AtomicInteger atomicInteger = new AtomicInteger(1);
+        // Create Executor
+        ScheduledExecutorService executorService_recover = mock(ScheduledExecutorService.class);
+        // Mock serviceConfiguration.
+        ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class);
+        when(serviceConfiguration.isEnableReplicatedSubscriptions()).thenReturn(false);
+        when(serviceConfiguration.isTransactionCoordinatorEnabled()).thenReturn(true);
+        // Mock executorProvider.
+        ExecutorProvider executorProvider = mock(ExecutorProvider.class);
+        when(executorProvider.getExecutor(any(Object.class))).thenReturn(executorService_recover);
+        // Mock pendingAckStore.
+        PendingAckStore pendingAckStore = mock(PendingAckStore.class);
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                new Thread(() -> {
+                    TopicTransactionBuffer.TopicTransactionBufferRecover recover
+                            = (TopicTransactionBuffer.TopicTransactionBufferRecover)invocation.getArguments()[0];
+                    TopicTransactionBufferRecoverCallBack callBack
+                            = Whitebox.getInternalState(recover, "callBack");;
+                    try {
+                        persistentTopic.get().getTransactionBuffer().closeAsync().get();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    } catch (ExecutionException e) {
+                        throw new RuntimeException(e);
+                    }
+                    callBack.recoverComplete();
+                }).start();
+                return null;
+            }
+        }).when(executorService_recover).execute(any());
+        // Mock executorProvider.
+        TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class);
+        when(pendingAckStoreProvider.checkInitializedBefore(any()))
+                .thenReturn(CompletableFuture.completedFuture(true));
+        when(pendingAckStoreProvider.newPendingAckStore(any()))
+                .thenReturn(CompletableFuture.completedFuture(pendingAckStore));
+        // Mock TransactionBufferSnapshotService
+        TransactionBufferSnapshotService transactionBufferSnapshotService
+                = mock(TransactionBufferSnapshotService.class);
+        SystemTopicClient.Writer writer = mock(SystemTopicClient.Writer.class);
+        when(writer.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+        when(transactionBufferSnapshotService.createWriter(any()))
+                .thenReturn(CompletableFuture.completedFuture(writer));
+        // Mock pulsar.
+        PulsarService pulsar = mock(PulsarService.class);
+        when(pulsar.getConfiguration()).thenReturn(serviceConfiguration);
+        when(pulsar.getConfig()).thenReturn(serviceConfiguration);
+        when(pulsar.getTransactionExecutorProvider()).thenReturn(executorProvider);
+        when(pulsar.getTransactionBufferSnapshotService()).thenReturn(transactionBufferSnapshotService);
+        TopicTransactionBufferProvider topicTransactionBufferProvider = new TopicTransactionBufferProvider();
+        when(pulsar.getTransactionBufferProvider()).thenReturn(topicTransactionBufferProvider);
+        // Mock BacklogQuotaManager
+        BacklogQuotaManager backlogQuotaManager = mock(BacklogQuotaManager.class);
+        // Mock brokerService.
+        BrokerService brokerService = mock(BrokerService.class);
+        when(brokerService.getPulsar()).thenReturn(pulsar);
+        when(brokerService.pulsar()).thenReturn(pulsar);
+        when(brokerService.getBacklogQuotaManager()).thenReturn(backlogQuotaManager);
+        // Mock managedLedger.
+        ManagedLedgerImpl managedLedger = mock(ManagedLedgerImpl.class);
+        ManagedCursorContainer managedCursors = new ManagedCursorContainer();
+        when(managedLedger.getCursors()).thenReturn(managedCursors);
+        PositionImpl position = PositionImpl.EARLIEST;
+        when(managedLedger.getLastConfirmedEntry()).thenReturn(position);
+        // Create topic.
+        persistentTopic.set(new PersistentTopic("topic-a", managedLedger, brokerService));
+        try {
+            // Do check.
+            persistentTopic.get().checkIfTransactionBufferRecoverCompletely(true).get(5, TimeUnit.SECONDS);
+            fail("Expect failure by TB closed, but it is finished.");
+        } catch (ExecutionException executionException){
+            Throwable t = executionException.getCause();
+            Assert.assertTrue(t instanceof BrokerServiceException.ServiceUnitNotReadyException);
+        }
+    }
 }