You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2023/02/09 05:42:19 UTC
[pulsar] branch branch-2.10 updated: [fix][txn] fix txn coordinator recover handle committing and aborting txn race condition. (#19201)
This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 5dd13ec80f2 [fix][txn] fix txn coordinator recover handle committing and aborting txn race condition. (#19201)
5dd13ec80f2 is described below
commit 5dd13ec80f2a88761bec54de7420704ec8d8e059
Author: thetumbled <52...@users.noreply.github.com>
AuthorDate: Wed Feb 1 09:06:09 2023 +0800
[fix][txn] fix txn coordinator recover handle committing and aborting txn race condition. (#19201)
Fixes #19200
transaction lasted for long time and will not be aborted, which cause TB's MaxReadPosition do not move and will not take snapshot. With an old snapshot, TB will read a lot of entry while doing recovery.
In worst cases, there are 30 minutes of unavailable time with Topics.
avoid concurrent execution.
(cherry picked from commit 96f4161056b84419d69f62635d6cdedebd1b9198)
---
.../broker/TransactionMetadataStoreService.java | 118 +++++++++++----------
.../impl/TransactionMetaStoreHandlerTest.java | 2 +-
.../impl/MLTransactionMetadataStore.java | 2 +-
3 files changed, 65 insertions(+), 57 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 902546958c5..bc29c2f2ee8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -186,54 +186,66 @@ public class TransactionMetadataStoreService {
return;
}
- openTransactionMetadataStore(tcId).thenAccept((store) -> internalPinnedExecutor.execute(() -> {
- stores.put(tcId, store);
- LOG.info("Added new transaction meta store {}", tcId);
- long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
- while (true) {
- // prevent thread in a busy loop.
- if (System.currentTimeMillis() < endTime) {
- CompletableFuture<Void> future = deque.poll();
- if (future != null) {
- // complete queue request future
- future.complete(null);
- } else {
- break;
- }
- } else {
- deque.clear();
- break;
- }
- }
-
- completableFuture.complete(null);
- tcLoadSemaphore.release();
- })).exceptionally(e -> {
- internalPinnedExecutor.execute(() -> {
- completableFuture.completeExceptionally(e.getCause());
- // release before handle request queue,
- //in order to client reconnect infinite loop
- tcLoadSemaphore.release();
- long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
- while (true) {
- // prevent thread in a busy loop.
- if (System.currentTimeMillis() < endTime) {
- CompletableFuture<Void> future = deque.poll();
- if (future != null) {
- // this means that this tc client connection connect fail
- future.completeExceptionally(e);
- } else {
- break;
- }
+ TransactionTimeoutTracker timeoutTracker = timeoutTrackerFactory.newTracker(tcId);
+ TransactionRecoverTracker recoverTracker =
+ new TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
+ timeoutTracker, tcId.getId());
+ openTransactionMetadataStore(tcId, timeoutTracker, recoverTracker).thenAccept(
+ store -> internalPinnedExecutor.execute(() -> {
+ // TransactionMetadataStore initialization
+ // need to use TransactionMetadataStore itself.
+ // we need to put store into stores map before
+ // handle committing and aborting transaction.
+ stores.put(tcId, store);
+ LOG.info("Added new transaction meta store {}", tcId);
+ recoverTracker.handleCommittingAndAbortingTransaction();
+ timeoutTracker.start();
+
+ long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
+ while (true) {
+ // prevent thread in a busy loop.
+ if (System.currentTimeMillis() < endTime) {
+ CompletableFuture<Void> future = deque.poll();
+ if (future != null) {
+ // complete queue request future
+ future.complete(null);
} else {
- deque.clear();
break;
}
+ } else {
+ deque.clear();
+ break;
}
- LOG.error("Add transaction metadata store with id {} error", tcId.getId(), e);
- });
- return null;
- });
+ }
+
+ completableFuture.complete(null);
+ tcLoadSemaphore.release();
+ })).exceptionally(e -> {
+ internalPinnedExecutor.execute(() -> {
+ completableFuture.completeExceptionally(e.getCause());
+ // release before handle request queue,
+ //in order to client reconnect infinite loop
+ tcLoadSemaphore.release();
+ long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
+ while (true) {
+ // prevent thread in a busy loop.
+ if (System.currentTimeMillis() < endTime) {
+ CompletableFuture<Void> future = deque.poll();
+ if (future != null) {
+ // this means that this tc client connection connect fail
+ future.completeExceptionally(e);
+ } else {
+ break;
+ }
+ } else {
+ deque.clear();
+ break;
+ }
+ }
+ LOG.error("Add transaction metadata store with id {} error", tcId.getId(), e);
+ });
+ return null;
+ });
} else {
// only one command can open transaction metadata store,
// other will be added to the deque, when the op of openTransactionMetadataStore finished
@@ -253,17 +265,13 @@ public class TransactionMetadataStoreService {
return completableFuture;
}
- public CompletableFuture<TransactionMetadataStore> openTransactionMetadataStore(TransactionCoordinatorID tcId) {
- return pulsarService.getBrokerService()
- .getManagedLedgerConfig(getMLTransactionLogName(tcId)).thenCompose(v -> {
- TransactionTimeoutTracker timeoutTracker = timeoutTrackerFactory.newTracker(tcId);
- TransactionRecoverTracker recoverTracker =
- new TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
- timeoutTracker, tcId.getId());
- return transactionMetadataStoreProvider
- .openStore(tcId, pulsarService.getManagedLedgerFactory(), v,
- timeoutTracker, recoverTracker);
- });
+ public CompletableFuture<TransactionMetadataStore>
+ openTransactionMetadataStore(TransactionCoordinatorID tcId,
+ TransactionTimeoutTracker timeoutTracker,
+ TransactionRecoverTracker recoverTracker) {
+ return pulsarService.getBrokerService().getManagedLedgerConfig(getMLTransactionLogName(tcId)).thenCompose(
+ v -> transactionMetadataStoreProvider.openStore(tcId, pulsarService.getManagedLedgerFactory(), v,
+ timeoutTracker, recoverTracker));
}
public CompletableFuture<Void> removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandlerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandlerTest.java
index 543bb35c4ea..d7d0a2be00b 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandlerTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandlerTest.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 273a01850cb..d5275e30d79 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -119,9 +119,9 @@ public class MLTransactionMetadataStore
+ tcID.toString() + " change state to Ready error when init it"));
} else {
+ completableFuture.complete(MLTransactionMetadataStore.this);
recoverTracker.handleCommittingAndAbortingTransaction();
timeoutTracker.start();
- completableFuture.complete(MLTransactionMetadataStore.this);
}
}