You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/12/17 01:22:43 UTC

[pulsar] branch master updated: [fix] [tx] Transaction buffer recover blocked by readNext (#18833)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b9446aa01d4 [fix] [tx] Transaction buffer recover blocked by readNext (#18833)
b9446aa01d4 is described below

commit b9446aa01d4c22a170e305cb4a8fbd2966eaff74
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Sat Dec 17 09:22:36 2022 +0800

    [fix] [tx] Transaction buffer recover blocked by readNext (#18833)
---
 .../SingleSnapshotAbortedTxnProcessorImpl.java     | 25 +++++--
 .../TopicTransactionBufferRecoverTest.java         | 81 ++++++++++++++++++++++
 2 files changed, 101 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
index a13dd0499a6..f8d0d323912 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
@@ -21,11 +21,14 @@ package org.apache.pulsar.broker.transaction.buffer.impl;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
@@ -33,6 +36,7 @@ import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
 import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 
@@ -78,16 +82,21 @@ public class SingleSnapshotAbortedTxnProcessorImpl implements AbortedTxnProcesso
         return aborts.containsKey(txnID);
     }
 
+    private long getSystemClientOperationTimeoutMs() throws Exception {
+        PulsarClientImpl pulsarClient = (PulsarClientImpl) topic.getBrokerService().getPulsar().getClient();
+        return pulsarClient.getConfiguration().getOperationTimeoutMs();
+    }
 
     @Override
     public CompletableFuture<PositionImpl> recoverFromSnapshot() {
         return topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
                 .getTxnBufferSnapshotService()
                 .createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
-                    PositionImpl startReadCursorPosition = null;
                     try {
+                    PositionImpl startReadCursorPosition = null;
                         while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = reader.readNext();
+                            Message<TransactionBufferSnapshot> message = reader.readNextAsync()
+                                    .get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
                             if (topic.getName().equals(message.getKey())) {
                                 TransactionBufferSnapshot transactionBufferSnapshot = message.getValue();
                                 if (transactionBufferSnapshot != null) {
@@ -98,15 +107,21 @@ public class SingleSnapshotAbortedTxnProcessorImpl implements AbortedTxnProcesso
                                 }
                             }
                         }
-                        closeReader(reader);
                         return CompletableFuture.completedFuture(startReadCursorPosition);
+                    } catch (TimeoutException ex) {
+                        Throwable t = FutureUtil.unwrapCompletionException(ex);
+                        String errorMessage = String.format("[%s] Transaction buffer recover fail by read "
+                                + "transactionBufferSnapshot timeout!", topic.getName());
+                        log.error(errorMessage, t);
+                        return FutureUtil.failedFuture(
+                                new BrokerServiceException.ServiceUnitNotReadyException(errorMessage, t));
                     } catch (Exception ex) {
                         log.error("[{}] Transaction buffer recover fail when read "
                                 + "transactionBufferSnapshot!", topic.getName(), ex);
-                        closeReader(reader);
                         return FutureUtil.failedFuture(ex);
+                    } finally {
+                        closeReader(reader);
                     }
-
                 },  topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
                         .getExecutor(this));
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 39c324d92f3..a2b72fc458d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -20,9 +20,12 @@ package org.apache.pulsar.broker.transaction;
 
 import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
@@ -35,8 +38,10 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -104,6 +109,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
     private static final int NUM_PARTITIONS = 16;
     @BeforeMethod
     protected void setup() throws Exception {
+        conf.getProperties().setProperty("brokerClient_operationTimeoutMs", Integer.valueOf(10 * 1000).toString());
         setUpBase(1, NUM_PARTITIONS, RECOVER_COMMIT, 0);
         admin.topics().createNonPartitionedTopic(RECOVER_ABORT);
         admin.topics().createNonPartitionedTopic(TAKE_SNAPSHOT);
@@ -248,6 +254,81 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
 
     }
 
+    private void makeTBSnapshotReaderTimeoutIfFirstRead(TopicName topicName) throws Exception {
+        SystemTopicClient.Reader mockReader = mock(SystemTopicClient.Reader.class);
+        AtomicBoolean isFirstCallOfMethodHasMoreEvents = new AtomicBoolean();
+        AtomicBoolean isFirstCallOfMethodHasReadNext = new AtomicBoolean();
+        AtomicBoolean isFirstCallOfMethodHasReadNextAsync = new AtomicBoolean();
+
+        doAnswer(invocation -> {
+            if (isFirstCallOfMethodHasMoreEvents.compareAndSet(false,true)){
+                return true;
+            } else {
+                return false;
+            }
+        }).when(mockReader).hasMoreEvents();
+
+        doAnswer(invocation -> {
+            if (isFirstCallOfMethodHasReadNext.compareAndSet(false, true)){
+                // Just stuck the thread.
+                Thread.sleep(3600 * 1000);
+            }
+            return null;
+        }).when(mockReader).readNext();
+
+        doAnswer(invocation -> {
+            CompletableFuture<Message> future = new CompletableFuture<>();
+            new Thread(() -> {
+                if (isFirstCallOfMethodHasReadNextAsync.compareAndSet(false, true)){
+                    // Just stuck the thread.
+                    try {
+                        Thread.sleep(3600 * 1000);
+                    } catch (InterruptedException e) {
+                    }
+                    future.complete(null);
+                } else {
+                    future.complete(null);
+                }
+            }).start();
+            return future;
+        }).when(mockReader).readNextAsync();
+
+        when(mockReader.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
+        for (PulsarService pulsarService : pulsarServiceList){
+            // Init prop: lastMessageIdInBroker.
+            final SystemTopicTxnBufferSnapshotService tbSnapshotService =
+                    pulsarService.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService();
+            SystemTopicTxnBufferSnapshotService spyTbSnapshotService = spy(tbSnapshotService);
+            doAnswer(invocation -> CompletableFuture.completedFuture(mockReader))
+                    .when(spyTbSnapshotService).createReader(topicName);
+            Field field =
+                    TransactionBufferSnapshotServiceFactory.class.getDeclaredField("txnBufferSnapshotService");
+            field.setAccessible(true);
+            field.set(pulsarService.getTransactionBufferSnapshotServiceFactory(), spyTbSnapshotService);
+        }
+    }
+
+    @Test(timeOut = 60 * 1000)
+    public void testTBRecoverCanRetryIfTimeoutRead() throws Exception {
+        String topicName = String.format("persistent://%s/%s", NAMESPACE1,
+                "tx_recover_" + UUID.randomUUID().toString().replaceAll("-", "_"));
+
+        // Make race condition of "getLastMessageId" and "compaction" to make recover can't complete.
+        makeTBSnapshotReaderTimeoutIfFirstRead(TopicName.get(topicName));
+        // Verify( Cmd-PRODUCER will wait for TB recover finished )
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .enableBatching(false)
+                .batchingMaxMessages(2)
+                .create();
+
+        // cleanup.
+        producer.close();
+        admin.topics().delete(topicName, false);
+    }
+
     private void testTakeSnapshot() throws Exception {
         @Cleanup
         Producer<String> producer = pulsarClient