You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2023/02/09 05:42:19 UTC

[pulsar] branch branch-2.10 updated: [fix][txn] fix txn coordinator recover handle committing and aborting txn race condition. (#19201)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 5dd13ec80f2 [fix][txn] fix txn coordinator recover handle committing and aborting txn race condition. (#19201)
5dd13ec80f2 is described below

commit 5dd13ec80f2a88761bec54de7420704ec8d8e059
Author: thetumbled <52...@users.noreply.github.com>
AuthorDate: Wed Feb 1 09:06:09 2023 +0800

    [fix][txn] fix txn coordinator recover handle committing and aborting txn race condition. (#19201)
    
    Fixes #19200
    
    transaction lasted for long time and will not be aborted, which cause TB's MaxReadPosition do not move and will not take snapshot. With an old snapshot, TB will read a lot of entry while doing recovery.
    In worst cases, there are 30 minutes of unavailable time with Topics.
    
    avoid concurrent execution.
    
    (cherry picked from commit 96f4161056b84419d69f62635d6cdedebd1b9198)
---
 .../broker/TransactionMetadataStoreService.java    | 118 +++++++++++----------
 .../impl/TransactionMetaStoreHandlerTest.java      |   2 +-
 .../impl/MLTransactionMetadataStore.java           |   2 +-
 3 files changed, 65 insertions(+), 57 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 902546958c5..bc29c2f2ee8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -186,54 +186,66 @@ public class TransactionMetadataStoreService {
                             return;
                         }
 
-                        openTransactionMetadataStore(tcId).thenAccept((store) -> internalPinnedExecutor.execute(() -> {
-                            stores.put(tcId, store);
-                            LOG.info("Added new transaction meta store {}", tcId);
-                            long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
-                            while (true) {
-                                // prevent thread in a busy loop.
-                                if (System.currentTimeMillis() < endTime) {
-                                    CompletableFuture<Void> future = deque.poll();
-                                    if (future != null) {
-                                        // complete queue request future
-                                        future.complete(null);
-                                    } else {
-                                        break;
-                                    }
-                                } else {
-                                    deque.clear();
-                                    break;
-                                }
-                            }
-
-                            completableFuture.complete(null);
-                            tcLoadSemaphore.release();
-                        })).exceptionally(e -> {
-                            internalPinnedExecutor.execute(() -> {
-                                        completableFuture.completeExceptionally(e.getCause());
-                                        // release before handle request queue,
-                                        //in order to client reconnect infinite loop
-                                        tcLoadSemaphore.release();
-                                        long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
-                                        while (true) {
-                                            // prevent thread in a busy loop.
-                                            if (System.currentTimeMillis() < endTime) {
-                                                CompletableFuture<Void> future = deque.poll();
-                                                if (future != null) {
-                                                    // this means that this tc client connection connect fail
-                                                    future.completeExceptionally(e);
-                                                } else {
-                                                    break;
-                                                }
+                        TransactionTimeoutTracker timeoutTracker = timeoutTrackerFactory.newTracker(tcId);
+                        TransactionRecoverTracker recoverTracker =
+                                new TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
+                                        timeoutTracker, tcId.getId());
+                        openTransactionMetadataStore(tcId, timeoutTracker, recoverTracker).thenAccept(
+                                store -> internalPinnedExecutor.execute(() -> {
+                                    // TransactionMetadataStore initialization
+                                    // need to use TransactionMetadataStore itself.
+                                    // we need to put store into stores map before
+                                    // handle committing and aborting transaction.
+                                    stores.put(tcId, store);
+                                    LOG.info("Added new transaction meta store {}", tcId);
+                                    recoverTracker.handleCommittingAndAbortingTransaction();
+                                    timeoutTracker.start();
+
+                                    long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
+                                    while (true) {
+                                        // prevent thread in a busy loop.
+                                        if (System.currentTimeMillis() < endTime) {
+                                            CompletableFuture<Void> future = deque.poll();
+                                            if (future != null) {
+                                                // complete queue request future
+                                                future.complete(null);
                                             } else {
-                                                deque.clear();
                                                 break;
                                             }
+                                        } else {
+                                            deque.clear();
+                                            break;
                                         }
-                                        LOG.error("Add transaction metadata store with id {} error", tcId.getId(), e);
-                                    });
-                                    return null;
-                                });
+                                    }
+
+                                    completableFuture.complete(null);
+                                    tcLoadSemaphore.release();
+                                })).exceptionally(e -> {
+                            internalPinnedExecutor.execute(() -> {
+                                completableFuture.completeExceptionally(e.getCause());
+                                // release before handle request queue,
+                                //in order to client reconnect infinite loop
+                                tcLoadSemaphore.release();
+                                long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
+                                while (true) {
+                                    // prevent thread in a busy loop.
+                                    if (System.currentTimeMillis() < endTime) {
+                                        CompletableFuture<Void> future = deque.poll();
+                                        if (future != null) {
+                                            // this means that this tc client connection connect fail
+                                            future.completeExceptionally(e);
+                                        } else {
+                                            break;
+                                        }
+                                    } else {
+                                        deque.clear();
+                                        break;
+                                    }
+                                }
+                                LOG.error("Add transaction metadata store with id {} error", tcId.getId(), e);
+                            });
+                            return null;
+                        });
                     } else {
                         // only one command can open transaction metadata store,
                         // other will be added to the deque, when the op of openTransactionMetadataStore finished
@@ -253,17 +265,13 @@ public class TransactionMetadataStoreService {
         return completableFuture;
     }
 
-    public CompletableFuture<TransactionMetadataStore> openTransactionMetadataStore(TransactionCoordinatorID tcId) {
-        return pulsarService.getBrokerService()
-                .getManagedLedgerConfig(getMLTransactionLogName(tcId)).thenCompose(v -> {
-                            TransactionTimeoutTracker timeoutTracker = timeoutTrackerFactory.newTracker(tcId);
-                            TransactionRecoverTracker recoverTracker =
-                                    new TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
-                                    timeoutTracker, tcId.getId());
-                            return transactionMetadataStoreProvider
-                                    .openStore(tcId, pulsarService.getManagedLedgerFactory(), v,
-                                            timeoutTracker, recoverTracker);
-                });
+    public CompletableFuture<TransactionMetadataStore>
+    openTransactionMetadataStore(TransactionCoordinatorID tcId,
+                                 TransactionTimeoutTracker timeoutTracker,
+                                 TransactionRecoverTracker recoverTracker) {
+        return pulsarService.getBrokerService().getManagedLedgerConfig(getMLTransactionLogName(tcId)).thenCompose(
+                v -> transactionMetadataStoreProvider.openStore(tcId, pulsarService.getManagedLedgerFactory(), v,
+                        timeoutTracker, recoverTracker));
     }
 
     public CompletableFuture<Void> removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandlerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandlerTest.java
index 543bb35c4ea..d7d0a2be00b 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandlerTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandlerTest.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 273a01850cb..d5275e30d79 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -119,9 +119,9 @@ public class MLTransactionMetadataStore
                                         + tcID.toString() + " change state to Ready error when init it"));
 
                     } else {
+                        completableFuture.complete(MLTransactionMetadataStore.this);
                         recoverTracker.handleCommittingAndAbortingTransaction();
                         timeoutTracker.start();
-                        completableFuture.complete(MLTransactionMetadataStore.this);
                     }
                 }