You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/12/17 01:22:43 UTC
[pulsar] branch master updated: [fix] [tx] Transaction buffer recover blocked by readNext (#18833)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b9446aa01d4 [fix] [tx] Transaction buffer recover blocked by readNext (#18833)
b9446aa01d4 is described below
commit b9446aa01d4c22a170e305cb4a8fbd2966eaff74
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Sat Dec 17 09:22:36 2022 +0800
[fix] [tx] Transaction buffer recover blocked by readNext (#18833)
---
.../SingleSnapshotAbortedTxnProcessorImpl.java | 25 +++++--
.../TopicTransactionBufferRecoverTest.java | 81 ++++++++++++++++++++++
2 files changed, 101 insertions(+), 5 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
index a13dd0499a6..f8d0d323912 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
@@ -21,11 +21,14 @@ package org.apache.pulsar.broker.transaction.buffer.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
@@ -33,6 +36,7 @@ import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
@@ -78,16 +82,21 @@ public class SingleSnapshotAbortedTxnProcessorImpl implements AbortedTxnProcesso
return aborts.containsKey(txnID);
}
+ private long getSystemClientOperationTimeoutMs() throws Exception {
+ PulsarClientImpl pulsarClient = (PulsarClientImpl) topic.getBrokerService().getPulsar().getClient();
+ return pulsarClient.getConfiguration().getOperationTimeoutMs();
+ }
@Override
public CompletableFuture<PositionImpl> recoverFromSnapshot() {
return topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
.getTxnBufferSnapshotService()
.createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
- PositionImpl startReadCursorPosition = null;
try {
+ PositionImpl startReadCursorPosition = null;
while (reader.hasMoreEvents()) {
- Message<TransactionBufferSnapshot> message = reader.readNext();
+ Message<TransactionBufferSnapshot> message = reader.readNextAsync()
+ .get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
if (topic.getName().equals(message.getKey())) {
TransactionBufferSnapshot transactionBufferSnapshot = message.getValue();
if (transactionBufferSnapshot != null) {
@@ -98,15 +107,21 @@ public class SingleSnapshotAbortedTxnProcessorImpl implements AbortedTxnProcesso
}
}
}
- closeReader(reader);
return CompletableFuture.completedFuture(startReadCursorPosition);
+ } catch (TimeoutException ex) {
+ Throwable t = FutureUtil.unwrapCompletionException(ex);
+ String errorMessage = String.format("[%s] Transaction buffer recover fail by read "
+ + "transactionBufferSnapshot timeout!", topic.getName());
+ log.error(errorMessage, t);
+ return FutureUtil.failedFuture(
+ new BrokerServiceException.ServiceUnitNotReadyException(errorMessage, t));
} catch (Exception ex) {
log.error("[{}] Transaction buffer recover fail when read "
+ "transactionBufferSnapshot!", topic.getName(), ex);
- closeReader(reader);
return FutureUtil.failedFuture(ex);
+ } finally {
+ closeReader(reader);
}
-
}, topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
.getExecutor(this));
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 39c324d92f3..a2b72fc458d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -20,9 +20,12 @@ package org.apache.pulsar.broker.transaction;
import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
@@ -35,8 +38,10 @@ import java.util.LinkedList;
import java.util.List;
import java.util.NavigableMap;
import java.util.Optional;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -104,6 +109,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
private static final int NUM_PARTITIONS = 16;
@BeforeMethod
protected void setup() throws Exception {
+ conf.getProperties().setProperty("brokerClient_operationTimeoutMs", Integer.valueOf(10 * 1000).toString());
setUpBase(1, NUM_PARTITIONS, RECOVER_COMMIT, 0);
admin.topics().createNonPartitionedTopic(RECOVER_ABORT);
admin.topics().createNonPartitionedTopic(TAKE_SNAPSHOT);
@@ -248,6 +254,81 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
}
+ private void makeTBSnapshotReaderTimeoutIfFirstRead(TopicName topicName) throws Exception {
+ SystemTopicClient.Reader mockReader = mock(SystemTopicClient.Reader.class);
+ AtomicBoolean isFirstCallOfMethodHasMoreEvents = new AtomicBoolean();
+ AtomicBoolean isFirstCallOfMethodHasReadNext = new AtomicBoolean();
+ AtomicBoolean isFirstCallOfMethodHasReadNextAsync = new AtomicBoolean();
+
+ doAnswer(invocation -> {
+ if (isFirstCallOfMethodHasMoreEvents.compareAndSet(false,true)){
+ return true;
+ } else {
+ return false;
+ }
+ }).when(mockReader).hasMoreEvents();
+
+ doAnswer(invocation -> {
+ if (isFirstCallOfMethodHasReadNext.compareAndSet(false, true)){
+ // Just stuck the thread.
+ Thread.sleep(3600 * 1000);
+ }
+ return null;
+ }).when(mockReader).readNext();
+
+ doAnswer(invocation -> {
+ CompletableFuture<Message> future = new CompletableFuture<>();
+ new Thread(() -> {
+ if (isFirstCallOfMethodHasReadNextAsync.compareAndSet(false, true)){
+ // Just stuck the thread.
+ try {
+ Thread.sleep(3600 * 1000);
+ } catch (InterruptedException e) {
+ }
+ future.complete(null);
+ } else {
+ future.complete(null);
+ }
+ }).start();
+ return future;
+ }).when(mockReader).readNextAsync();
+
+ when(mockReader.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
+ for (PulsarService pulsarService : pulsarServiceList){
+ // Init prop: lastMessageIdInBroker.
+ final SystemTopicTxnBufferSnapshotService tbSnapshotService =
+ pulsarService.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService();
+ SystemTopicTxnBufferSnapshotService spyTbSnapshotService = spy(tbSnapshotService);
+ doAnswer(invocation -> CompletableFuture.completedFuture(mockReader))
+ .when(spyTbSnapshotService).createReader(topicName);
+ Field field =
+ TransactionBufferSnapshotServiceFactory.class.getDeclaredField("txnBufferSnapshotService");
+ field.setAccessible(true);
+ field.set(pulsarService.getTransactionBufferSnapshotServiceFactory(), spyTbSnapshotService);
+ }
+ }
+
+ @Test(timeOut = 60 * 1000)
+ public void testTBRecoverCanRetryIfTimeoutRead() throws Exception {
+ String topicName = String.format("persistent://%s/%s", NAMESPACE1,
+ "tx_recover_" + UUID.randomUUID().toString().replaceAll("-", "_"));
+
+ // Make race condition of "getLastMessageId" and "compaction" to make recover can't complete.
+ makeTBSnapshotReaderTimeoutIfFirstRead(TopicName.get(topicName));
+ // Verify( Cmd-PRODUCER will wait for TB recover finished )
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .enableBatching(false)
+ .batchingMaxMessages(2)
+ .create();
+
+ // cleanup.
+ producer.close();
+ admin.topics().delete(topicName, false);
+ }
+
private void testTakeSnapshot() throws Exception {
@Cleanup
Producer<String> producer = pulsarClient