You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/02/25 08:25:19 UTC

[pulsar] branch branch-2.10 updated: [Transaction] Adopt Single_thread to handle TcClient connecting (#13969)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 66a465a  [Transaction] Adopt Single_thread to handle TcClient connecting (#13969)
66a465a is described below

commit 66a465a40864e46d464cdfcca7de4e8acfec4235
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri Feb 25 16:14:52 2022 +0800

    [Transaction] Adopt Single_thread to handle TcClient connecting (#13969)
    
    ### Motivation
    
    The broker will only reconnect the same TC once at the same time, and other connection requests during the reconnection period will be processed together after the connection is completed.
    There may be concurrency problems in the queue for request addition and the clearing of the queue.
    
    ### Modification
    
    Use SingleThread to deal TcClient connecting.
    
    (cherry picked from commit 29259e1b5c33856cd6bd9413e331f4592fd3007c)
---
 .../org/apache/pulsar/broker/PulsarService.java    |   5 +
 .../broker/TransactionMetadataStoreService.java    | 152 ++++++++++++---------
 2 files changed, 91 insertions(+), 66 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 9089e1d..8f2bbd4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -424,6 +424,11 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                 brokerAdditionalServlets = null;
             }
 
+            if (this.transactionMetadataStoreService != null) {
+                this.transactionMetadataStoreService.close();
+                this.transactionMetadataStoreService = null;
+            }
+
             GracefulExecutorServicesShutdown executorServicesShutdown =
                     GracefulExecutorServicesShutdown
                             .initiate()
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 9b60679..b3ced3c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -24,6 +24,7 @@ import static org.apache.pulsar.transaction.coordinator.proto.TxnStatus.COMMITTI
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Deque;
@@ -32,7 +33,10 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
@@ -84,9 +88,14 @@ public class TransactionMetadataStoreService {
     // one connect request open the transactionMetaStore the other request will add to the queue, when the open op
     // finished the request will be poll and complete the future
     private final ConcurrentLongHashMap<ConcurrentLinkedDeque<CompletableFuture<Void>>> pendingConnectRequests;
+    private final ExecutorService internalPinnedExecutor;
 
     private static final long HANDLE_PENDING_CONNECT_TIME_OUT = 30000L;
 
+    private final ThreadFactory threadFactory =
+            new DefaultThreadFactory("transaction-coordinator-thread-factory");
+
+
     public TransactionMetadataStoreService(TransactionMetadataStoreProvider transactionMetadataStoreProvider,
                                            PulsarService pulsarService, TransactionBufferClient tbClient,
                                            HashedWheelTimer timer) {
@@ -98,6 +107,7 @@ public class TransactionMetadataStoreService {
         this.transactionOpRetryTimer = timer;
         this.tcLoadSemaphores = new ConcurrentLongHashMap<>();
         this.pendingConnectRequests = new ConcurrentLongHashMap<>();
+        this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
     }
 
     @Deprecated
@@ -152,80 +162,86 @@ public class TransactionMetadataStoreService {
     }
 
     public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tcId) {
-        if (stores.get(tcId) != null) {
-            return CompletableFuture.completedFuture(null);
-        } else {
-            return pulsarService.getBrokerService().checkTopicNsOwnership(TopicName
-                    .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId()).toString()).thenCompose(v -> {
-                        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-                final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
-                        .computeIfAbsent(tcId.getId(), (id) -> new Semaphore(1));
-                Deque<CompletableFuture<Void>> deque = pendingConnectRequests
-                        .computeIfAbsent(tcId.getId(), (id) -> new ConcurrentLinkedDeque<>());
-                if (tcLoadSemaphore.tryAcquire()) {
-                    // when tcLoadSemaphore.release(), this command will acquire semaphore, so we should jude the store
-                    // exist again.
-                    if (stores.get(tcId) != null) {
-                        return CompletableFuture.completedFuture(null);
-                    }
-
-                    openTransactionMetadataStore(tcId).thenAccept((store) -> {
-                        stores.put(tcId, store);
-                        LOG.info("Added new transaction meta store {}", tcId);
-                        long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
-                        while (true) {
-                            // prevent thread in a busy loop.
-                            if (System.currentTimeMillis() < endTime) {
-                                CompletableFuture<Void> future = deque.poll();
-                                if (future != null) {
-                                    // complete queue request future
-                                    future.complete(null);
-                                } else {
-                                    break;
-                                }
-                            } else {
-                                deque.clear();
-                                break;
-                            }
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (stores.get(tcId) != null) {
+                completableFuture.complete(null);
+            } else {
+                pulsarService.getBrokerService().checkTopicNsOwnership(TopicName
+                        .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId()).toString())
+                        .thenRun(() -> internalPinnedExecutor.execute(() -> {
+                    final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
+                            .computeIfAbsent(tcId.getId(), (id) -> new Semaphore(1));
+                    Deque<CompletableFuture<Void>> deque = pendingConnectRequests
+                            .computeIfAbsent(tcId.getId(), (id) -> new ConcurrentLinkedDeque<>());
+                    if (tcLoadSemaphore.tryAcquire()) {
+                        // when tcLoadSemaphore.release(), this command will acquire semaphore,
+                        // so we should jude the store exist again.
+                        if (stores.get(tcId) != null) {
+                            completableFuture.complete(null);
                         }
 
-                        completableFuture.complete(null);
-                        tcLoadSemaphore.release();
-                    }).exceptionally(e -> {
-                        completableFuture.completeExceptionally(e.getCause());
-                        // release before handle request queue, in order to client reconnect infinite loop
-                        tcLoadSemaphore.release();
-                        long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
-                        while (true) {
-                            // prevent thread in a busy loop.
-                            if (System.currentTimeMillis() < endTime) {
-                                CompletableFuture<Void> future = deque.poll();
-                                if (future != null) {
-                                    // this means that this tc client connection connect fail
-                                    future.completeExceptionally(e);
+                        openTransactionMetadataStore(tcId).thenAccept((store) -> internalPinnedExecutor.execute(() -> {
+                            stores.put(tcId, store);
+                            LOG.info("Added new transaction meta store {}", tcId);
+                            long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
+                            while (true) {
+                                // prevent thread in a busy loop.
+                                if (System.currentTimeMillis() < endTime) {
+                                    CompletableFuture<Void> future = deque.poll();
+                                    if (future != null) {
+                                        // complete queue request future
+                                        future.complete(null);
+                                    } else {
+                                        break;
+                                    }
                                 } else {
+                                    deque.clear();
                                     break;
                                 }
-                            } else {
-                                deque.clear();
-                                break;
                             }
+
+                            completableFuture.complete(null);
+                            tcLoadSemaphore.release();
+                        })).exceptionally(e -> {
+                            internalPinnedExecutor.execute(() -> {
+                                        completableFuture.completeExceptionally(e.getCause());
+                                        // release before handle request queue,
+                                        //in order to client reconnect infinite loop
+                                        tcLoadSemaphore.release();
+                                        long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
+                                        while (true) {
+                                            // prevent thread in a busy loop.
+                                            if (System.currentTimeMillis() < endTime) {
+                                                CompletableFuture<Void> future = deque.poll();
+                                                if (future != null) {
+                                                    // this means that this tc client connection connect fail
+                                                    future.completeExceptionally(e);
+                                                } else {
+                                                    break;
+                                                }
+                                            } else {
+                                                deque.clear();
+                                                break;
+                                            }
+                                        }
+                                        LOG.error("Add transaction metadata store with id {} error", tcId.getId(), e);
+                                    });
+                                    return null;
+                                });
+                    } else {
+                        // only one command can open transaction metadata store,
+                        // other will be added to the deque, when the op of openTransactionMetadataStore finished
+                        // then handle the requests witch in the queue
+                        deque.add(completableFuture);
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Handle tc client connect added into pending queue! tcId : {}", tcId.toString());
                         }
-                        LOG.error("Add transaction metadata store with id {} error", tcId.getId(), e);
-                        return null;
-                    });
-                } else {
-                    // only one command can open transaction metadata store,
-                    // other will be added to the deque, when the op of openTransactionMetadataStore finished
-                    // then handle the requests witch in the queue
-                    deque.add(completableFuture);
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Handle tc client connect added into pending queue! tcId : {}", tcId.toString());
                     }
-                }
-                return completableFuture;
-            });
-        }
+                }));
+            }
+        });
+        return completableFuture;
     }
 
     public CompletableFuture<TransactionMetadataStore> openTransactionMetadataStore(TransactionCoordinatorID tcId) {
@@ -537,4 +553,8 @@ public class TransactionMetadataStoreService {
     public Map<TransactionCoordinatorID, TransactionMetadataStore> getStores() {
         return Collections.unmodifiableMap(stores);
     }
+
+    public void close () {
+        this.internalPinnedExecutor.shutdown();
+    }
 }