You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/02/25 07:58:28 UTC

[pulsar] 09/13: [Transaction] Adopt single thread pool in TC (#14238)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit acd934106182a8c250155f09842d3119c85e02d6
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Thu Feb 24 21:57:38 2022 +0800

    [Transaction] Adopt single thread pool in TC (#14238)
    
    ### Motivation
    Optimize code and improve maintainability.
    ### Modification
    * Option 1 (the way I use)
    Create a thread pool at peer TC.
      * advantage
      Each TC has a single thread pool to perform its own tasks, and will not be blocked due to sharing a single  thread with other TCs
      * disadvantage
      Too many thread pools may be created
    * Option 2
    Create an ExecuteProvider in the TC service.  It create some single-threaded pools when the TC Service is created, and  then assign a single-threaded pool to TC when the TC is created
       * The advantages and disadvantages are opposite to the option one
    
    (cherry picked from commit ced57866700aaeae163bcc6670d9a8eb1ffe8c50)
---
 .../impl/MLTransactionMetadataStore.java           | 328 ++++++++++++---------
 1 file changed, 188 insertions(+), 140 deletions(-)

diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index a71d203..f109ec4 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -18,12 +18,15 @@
  */
 package org.apache.pulsar.transaction.coordinator.impl;
 
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.Position;
@@ -69,6 +72,7 @@ public class MLTransactionMetadataStore
     private final LongAdder transactionTimeoutCount;
     private final LongAdder appendLogCount;
     private final MLTransactionSequenceIdGenerator sequenceIdGenerator;
+    private final ExecutorService internalPinnedExecutor;
 
     public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
                                       MLTransactionLogImpl mlTransactionLog,
@@ -87,12 +91,16 @@ public class MLTransactionMetadataStore
         this.abortedTransactionCount = new LongAdder();
         this.transactionTimeoutCount = new LongAdder();
         this.appendLogCount = new LongAdder();
+        DefaultThreadFactory threadFactory = new DefaultThreadFactory("transaction_coordinator_"
+                + tcID.toString() + "thread_factory");
+        this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
 
         if (!changeToInitializingState()) {
             log.error("Managed ledger transaction metadata store change state error when init it");
             return;
         }
-        new Thread(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() {
+
+        internalPinnedExecutor.execute(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() {
 
             @Override
             public void replayComplete() {
@@ -125,7 +133,8 @@ public class MLTransactionMetadataStore
                                 long timeoutAt = transactionMetadataEntry.getTimeoutMs();
                                 txnMetaMap.put(transactionId, MutablePair.of(new TxnMetaImpl(txnID,
                                         openTimestamp, timeoutAt), positions));
-                                recoverTracker.handleOpenStatusTransaction(txnSequenceId, timeoutAt + openTimestamp);
+                                recoverTracker.handleOpenStatusTransaction(txnSequenceId,
+                                        timeoutAt + openTimestamp);
                             }
                             break;
                         case ADD_PARTITION:
@@ -174,7 +183,7 @@ public class MLTransactionMetadataStore
                     log.error(e.getMessage(), e);
                 }
             }
-        })).start();
+        }));
     }
 
     @Override
@@ -195,167 +204,206 @@ public class MLTransactionMetadataStore
     }
 
     @Override
-    public synchronized CompletableFuture<TxnID> newTransaction(long timeOut) {
-        if (!checkIfReady()) {
-            return FutureUtil.failedFuture(
-                    new CoordinatorException
-                            .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction"));
-        }
+    public CompletableFuture<TxnID> newTransaction(long timeOut) {
+        CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (!checkIfReady()) {
+                completableFuture.completeExceptionally(new CoordinatorException
+                        .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction"));
+                return;
+            }
 
-        long mostSigBits = tcID.getId();
-        long leastSigBits = sequenceIdGenerator.generateSequenceId();
-        TxnID txnID = new TxnID(mostSigBits, leastSigBits);
-        long currentTimeMillis = System.currentTimeMillis();
-        TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
-                .setTxnidMostBits(mostSigBits)
-                .setTxnidLeastBits(leastSigBits)
-                .setStartTime(currentTimeMillis)
-                .setTimeoutMs(timeOut)
-                .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
-                .setLastModificationTime(currentTimeMillis)
-                .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
-        return transactionLog.append(transactionMetadataEntry)
-                .thenCompose(position -> {
-                    appendLogCount.increment();
-                    TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut);
-                    List<Position> positions = new ArrayList<>();
-                    positions.add(position);
-                    Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
-                    txnMetaMap.put(leastSigBits, pair);
-                    this.timeoutTracker.addTransaction(leastSigBits, timeOut);
-                    createdTransactionCount.increment();
-                    return CompletableFuture.completedFuture(txnID);
-                });
+            long mostSigBits = tcID.getId();
+            long leastSigBits = sequenceIdGenerator.generateSequenceId();
+            TxnID txnID = new TxnID(mostSigBits, leastSigBits);
+            long currentTimeMillis = System.currentTimeMillis();
+            TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
+                    .setTxnidMostBits(mostSigBits)
+                    .setTxnidLeastBits(leastSigBits)
+                    .setStartTime(currentTimeMillis)
+                    .setTimeoutMs(timeOut)
+                    .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
+                    .setLastModificationTime(currentTimeMillis)
+                    .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+            transactionLog.append(transactionMetadataEntry)
+                    .whenComplete((position, throwable) -> {
+                        if (throwable != null) {
+                            completableFuture.completeExceptionally(throwable);
+                        } else {
+                            appendLogCount.increment();
+                            TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut);
+                            List<Position> positions = new ArrayList<>();
+                            positions.add(position);
+                            Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
+                            txnMetaMap.put(leastSigBits, pair);
+                            this.timeoutTracker.addTransaction(leastSigBits, timeOut);
+                            createdTransactionCount.increment();
+                            completableFuture.complete(txnID);
+                        }
+                    });
+        });
+        return completableFuture;
     }
 
     @Override
-    public synchronized CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, List<String> partitions) {
-        if (!checkIfReady()) {
-            return FutureUtil.failedFuture(
-                    new CoordinatorException.TransactionMetadataStoreStateException(tcID,
-                            State.Ready, getState(), "add produced partition"));
-        }
-        return getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
-            TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
-                    .setTxnidMostBits(txnID.getMostSigBits())
-                    .setTxnidLeastBits(txnID.getLeastSigBits())
-                    .setMetadataOp(TransactionMetadataOp.ADD_PARTITION)
-                    .addAllPartitions(partitions)
-                    .setLastModificationTime(System.currentTimeMillis())
-                    .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+    public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, List<String> partitions) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (!checkIfReady()) {
+                completableFuture
+                        .completeExceptionally(new CoordinatorException.TransactionMetadataStoreStateException(tcID,
+                        State.Ready, getState(), "add produced partition"));
+                return;
+            }
+            getTxnPositionPair(txnID).thenAccept(txnMetaListPair -> {
+                TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
+                        .setTxnidMostBits(txnID.getMostSigBits())
+                        .setTxnidLeastBits(txnID.getLeastSigBits())
+                        .setMetadataOp(TransactionMetadataOp.ADD_PARTITION)
+                        .addAllPartitions(partitions)
+                        .setLastModificationTime(System.currentTimeMillis())
+                        .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
 
-            return transactionLog.append(transactionMetadataEntry)
-                    .thenCompose(position -> {
-                        appendLogCount.increment();
-                        try {
-                            synchronized (txnMetaListPair.getLeft()) {
-                                txnMetaListPair.getLeft().addProducedPartitions(partitions);
-                                txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position);
+                transactionLog.append(transactionMetadataEntry)
+                        .whenComplete((position, exception) -> {
+                            if (exception != null) {
+                                completableFuture.completeExceptionally(exception);
+                                return;
                             }
-                            return CompletableFuture.completedFuture(null);
-                        } catch (InvalidTxnStatusException e) {
-                            transactionLog.deletePosition(Collections.singletonList(position));
-                            log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
-                                    + " add produced partition error with TxnStatus : "
-                                    + txnMetaListPair.getLeft().status().name(), e);
-                            return FutureUtil.failedFuture(e);
-                        }
-                    });
+                            appendLogCount.increment();
+                            try {
+                                synchronized (txnMetaListPair.getLeft()) {
+                                    txnMetaListPair.getLeft().addProducedPartitions(partitions);
+                                    txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position);
+                                }
+                                completableFuture.complete(null);
+                            } catch (InvalidTxnStatusException e) {
+                                transactionLog.deletePosition(Collections.singletonList(position));
+                                log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
+                                        + " add produced partition error with TxnStatus : "
+                                        + txnMetaListPair.getLeft().status().name(), e);
+                                completableFuture.completeExceptionally(e);
+                            }
+                        });
+            });
         });
+        return completableFuture;
     }
 
     @Override
-    public synchronized CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID,
+    public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID,
                                                           List<TransactionSubscription> txnSubscriptions) {
-        if (!checkIfReady()) {
-            return FutureUtil.failedFuture(
-                    new CoordinatorException.TransactionMetadataStoreStateException(tcID,
-                            State.Ready, getState(), "add acked partition"));
-        }
-        return getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
-            TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
-                    .setTxnidMostBits(txnID.getMostSigBits())
-                    .setTxnidLeastBits(txnID.getLeastSigBits())
-                    .setMetadataOp(TransactionMetadataOp.ADD_SUBSCRIPTION)
-                    .addAllSubscriptions(txnSubscriptionToSubscription(txnSubscriptions))
-                    .setLastModificationTime(System.currentTimeMillis())
-                    .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (!checkIfReady()) {
+                completableFuture.completeExceptionally(new CoordinatorException
+                        .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "add acked partition"));
+                return;
+            }
+            getTxnPositionPair(txnID).thenAccept(txnMetaListPair -> {
+                TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
+                        .setTxnidMostBits(txnID.getMostSigBits())
+                        .setTxnidLeastBits(txnID.getLeastSigBits())
+                        .setMetadataOp(TransactionMetadataOp.ADD_SUBSCRIPTION)
+                        .addAllSubscriptions(txnSubscriptionToSubscription(txnSubscriptions))
+                        .setLastModificationTime(System.currentTimeMillis())
+                        .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
 
-            return transactionLog.append(transactionMetadataEntry)
-                    .thenCompose(position -> {
-                        appendLogCount.increment();
-                        try {
-                            synchronized (txnMetaListPair.getLeft()) {
-                                txnMetaListPair.getLeft().addAckedPartitions(txnSubscriptions);
-                                txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position);
+                transactionLog.append(transactionMetadataEntry)
+                        .whenComplete((position, exception) -> {
+                            if (exception != null) {
+                                completableFuture.completeExceptionally(exception);
+                                return;
                             }
-                            return CompletableFuture.completedFuture(null);
-                        } catch (InvalidTxnStatusException e) {
-                            transactionLog.deletePosition(Collections.singletonList(position));
-                            log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
-                                    + " add acked subscription error with TxnStatus : "
-                                    + txnMetaListPair.getLeft().status().name(), e);
-                            return FutureUtil.failedFuture(e);
-                        }
-                    });
+                            appendLogCount.increment();
+                            try {
+                                synchronized (txnMetaListPair.getLeft()) {
+                                    txnMetaListPair.getLeft().addAckedPartitions(txnSubscriptions);
+                                    txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position);
+                                }
+                                completableFuture.complete(null);
+                            } catch (InvalidTxnStatusException e) {
+                                transactionLog.deletePosition(Collections.singletonList(position));
+                                log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
+                                        + " add acked subscription error with TxnStatus : "
+                                        + txnMetaListPair.getLeft().status().name(), e);
+                                completableFuture.completeExceptionally(e);
+                            }
+                        });
+            });
         });
+        return completableFuture;
     }
 
     @Override
-    public synchronized CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus newStatus,
+    public CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus newStatus,
                                                                 TxnStatus expectedStatus, boolean isTimeout) {
-        if (!checkIfReady()) {
-            return FutureUtil.failedFuture(
-                    new CoordinatorException.TransactionMetadataStoreStateException(tcID,
-                            State.Ready, getState(), "update transaction status"));
-        }
-        return getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
-            if (txnMetaListPair.getLeft().status() == newStatus) {
-                return CompletableFuture.completedFuture(null);
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (!checkIfReady()) {
+                completableFuture.completeExceptionally(new CoordinatorException
+                        .TransactionMetadataStoreStateException(tcID,
+                        State.Ready, getState(), "update transaction status"));
+                return;
             }
-            TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
-                    .setTxnidMostBits(txnID.getMostSigBits())
-                    .setTxnidLeastBits(txnID.getLeastSigBits())
-                    .setExpectedStatus(expectedStatus)
-                    .setMetadataOp(TransactionMetadataOp.UPDATE)
-                    .setLastModificationTime(System.currentTimeMillis())
-                    .setNewStatus(newStatus)
-                    .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+            getTxnPositionPair(txnID).thenAccept(txnMetaListPair -> {
+                if (txnMetaListPair.getLeft().status() == newStatus) {
+                    completableFuture.complete(null);
+                    return;
+                }
+                TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
+                        .setTxnidMostBits(txnID.getMostSigBits())
+                        .setTxnidLeastBits(txnID.getLeastSigBits())
+                        .setExpectedStatus(expectedStatus)
+                        .setMetadataOp(TransactionMetadataOp.UPDATE)
+                        .setLastModificationTime(System.currentTimeMillis())
+                        .setNewStatus(newStatus)
+                        .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
 
-            return transactionLog.append(transactionMetadataEntry).thenCompose(position -> {
-                appendLogCount.increment();
-                try {
-                    synchronized (txnMetaListPair.getLeft()) {
-                        txnMetaListPair.getLeft().updateTxnStatus(newStatus, expectedStatus);
-                        txnMetaListPair.getRight().add(position);
+                transactionLog.append(transactionMetadataEntry).whenComplete((position, throwable) -> {
+                    if (throwable != null) {
+                        completableFuture.completeExceptionally(throwable);
+                        return;
                     }
-                    if (newStatus == TxnStatus.ABORTING && isTimeout) {
-                        this.transactionTimeoutCount.increment();
-                    }
-                    if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
-                        return transactionLog.deletePosition(txnMetaListPair.getRight()).thenCompose(v -> {
-                            this.transactionMetadataStoreStats
-                                    .addTransactionExecutionLatencySample(System.currentTimeMillis()
-                                            - txnMetaListPair.getLeft().getOpenTimestamp());
-                            if (newStatus == TxnStatus.COMMITTED) {
-                                committedTransactionCount.increment();
-                            } else {
-                                abortedTransactionCount.increment();
-                            }
-                            txnMetaMap.remove(txnID.getLeastSigBits());
-                            return CompletableFuture.completedFuture(null);
-                        });
+                    appendLogCount.increment();
+                    try {
+                        synchronized (txnMetaListPair.getLeft()) {
+                            txnMetaListPair.getLeft().updateTxnStatus(newStatus, expectedStatus);
+                            txnMetaListPair.getRight().add(position);
+                        }
+                        if (newStatus == TxnStatus.ABORTING && isTimeout) {
+                            this.transactionTimeoutCount.increment();
+                        }
+                        if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
+                            transactionLog.deletePosition(txnMetaListPair.getRight()).whenComplete((v, exception) -> {
+                                if (exception != null) {
+                                    completableFuture.completeExceptionally(exception);
+                                    return;
+                                }
+                                this.transactionMetadataStoreStats
+                                        .addTransactionExecutionLatencySample(System.currentTimeMillis()
+                                                - txnMetaListPair.getLeft().getOpenTimestamp());
+                                if (newStatus == TxnStatus.COMMITTED) {
+                                    committedTransactionCount.increment();
+                                } else {
+                                    abortedTransactionCount.increment();
+                                }
+                                txnMetaMap.remove(txnID.getLeastSigBits());
+                                completableFuture.complete(null);
+                            });
+                        }
+                        completableFuture.complete(null);
+                    } catch (InvalidTxnStatusException e) {
+                        transactionLog.deletePosition(Collections.singletonList(position));
+                        log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
+                                + " add update txn status error with TxnStatus : "
+                                + txnMetaListPair.getLeft().status().name(), e);
+                        completableFuture.completeExceptionally(e);
                     }
-                    return CompletableFuture.completedFuture(null);
-                } catch (InvalidTxnStatusException e) {
-                    transactionLog.deletePosition(Collections.singletonList(position));
-                    log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
-                            + " add update txn status error with TxnStatus : "
-                            + txnMetaListPair.getLeft().status().name(), e);
-                    return FutureUtil.failedFuture(e);
-                }
+                });
             });
         });
+       return completableFuture;
     }
 
     @Override