You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/02/25 07:58:28 UTC
[pulsar] 09/13: [Transaction] Adopt single thread pool in TC (#14238)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit acd934106182a8c250155f09842d3119c85e02d6
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Thu Feb 24 21:57:38 2022 +0800
[Transaction] Adopt single thread pool in TC (#14238)
### Motivation
Optimize code and improve maintainability.
### Modification
* Option 1 (the way I use)
Create a thread pool at peer TC.
* advantage
Each TC has a single thread pool to perform its own tasks, and will not be blocked due to sharing a single thread with other TCs
* disadvantage
Too many thread pools may be created
* Option 2
Create an ExecuteProvider in the TC service. It create some single-threaded pools when the TC Service is created, and then assign a single-threaded pool to TC when the TC is created
* The advantages and disadvantages are opposite to the option one
(cherry picked from commit ced57866700aaeae163bcc6670d9a8eb1ffe8c50)
---
.../impl/MLTransactionMetadataStore.java | 328 ++++++++++++---------
1 file changed, 188 insertions(+), 140 deletions(-)
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index a71d203..f109ec4 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -18,12 +18,15 @@
*/
package org.apache.pulsar.transaction.coordinator.impl;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
@@ -69,6 +72,7 @@ public class MLTransactionMetadataStore
private final LongAdder transactionTimeoutCount;
private final LongAdder appendLogCount;
private final MLTransactionSequenceIdGenerator sequenceIdGenerator;
+ private final ExecutorService internalPinnedExecutor;
public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
MLTransactionLogImpl mlTransactionLog,
@@ -87,12 +91,16 @@ public class MLTransactionMetadataStore
this.abortedTransactionCount = new LongAdder();
this.transactionTimeoutCount = new LongAdder();
this.appendLogCount = new LongAdder();
+ DefaultThreadFactory threadFactory = new DefaultThreadFactory("transaction_coordinator_"
+ + tcID.toString() + "thread_factory");
+ this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
if (!changeToInitializingState()) {
log.error("Managed ledger transaction metadata store change state error when init it");
return;
}
- new Thread(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() {
+
+ internalPinnedExecutor.execute(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() {
@Override
public void replayComplete() {
@@ -125,7 +133,8 @@ public class MLTransactionMetadataStore
long timeoutAt = transactionMetadataEntry.getTimeoutMs();
txnMetaMap.put(transactionId, MutablePair.of(new TxnMetaImpl(txnID,
openTimestamp, timeoutAt), positions));
- recoverTracker.handleOpenStatusTransaction(txnSequenceId, timeoutAt + openTimestamp);
+ recoverTracker.handleOpenStatusTransaction(txnSequenceId,
+ timeoutAt + openTimestamp);
}
break;
case ADD_PARTITION:
@@ -174,7 +183,7 @@ public class MLTransactionMetadataStore
log.error(e.getMessage(), e);
}
}
- })).start();
+ }));
}
@Override
@@ -195,167 +204,206 @@ public class MLTransactionMetadataStore
}
@Override
- public synchronized CompletableFuture<TxnID> newTransaction(long timeOut) {
- if (!checkIfReady()) {
- return FutureUtil.failedFuture(
- new CoordinatorException
- .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction"));
- }
+ public CompletableFuture<TxnID> newTransaction(long timeOut) {
+ CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
+ internalPinnedExecutor.execute(() -> {
+ if (!checkIfReady()) {
+ completableFuture.completeExceptionally(new CoordinatorException
+ .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction"));
+ return;
+ }
- long mostSigBits = tcID.getId();
- long leastSigBits = sequenceIdGenerator.generateSequenceId();
- TxnID txnID = new TxnID(mostSigBits, leastSigBits);
- long currentTimeMillis = System.currentTimeMillis();
- TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
- .setTxnidMostBits(mostSigBits)
- .setTxnidLeastBits(leastSigBits)
- .setStartTime(currentTimeMillis)
- .setTimeoutMs(timeOut)
- .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
- .setLastModificationTime(currentTimeMillis)
- .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
- return transactionLog.append(transactionMetadataEntry)
- .thenCompose(position -> {
- appendLogCount.increment();
- TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut);
- List<Position> positions = new ArrayList<>();
- positions.add(position);
- Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
- txnMetaMap.put(leastSigBits, pair);
- this.timeoutTracker.addTransaction(leastSigBits, timeOut);
- createdTransactionCount.increment();
- return CompletableFuture.completedFuture(txnID);
- });
+ long mostSigBits = tcID.getId();
+ long leastSigBits = sequenceIdGenerator.generateSequenceId();
+ TxnID txnID = new TxnID(mostSigBits, leastSigBits);
+ long currentTimeMillis = System.currentTimeMillis();
+ TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
+ .setTxnidMostBits(mostSigBits)
+ .setTxnidLeastBits(leastSigBits)
+ .setStartTime(currentTimeMillis)
+ .setTimeoutMs(timeOut)
+ .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
+ .setLastModificationTime(currentTimeMillis)
+ .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+ transactionLog.append(transactionMetadataEntry)
+ .whenComplete((position, throwable) -> {
+ if (throwable != null) {
+ completableFuture.completeExceptionally(throwable);
+ } else {
+ appendLogCount.increment();
+ TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut);
+ List<Position> positions = new ArrayList<>();
+ positions.add(position);
+ Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
+ txnMetaMap.put(leastSigBits, pair);
+ this.timeoutTracker.addTransaction(leastSigBits, timeOut);
+ createdTransactionCount.increment();
+ completableFuture.complete(txnID);
+ }
+ });
+ });
+ return completableFuture;
}
@Override
- public synchronized CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, List<String> partitions) {
- if (!checkIfReady()) {
- return FutureUtil.failedFuture(
- new CoordinatorException.TransactionMetadataStoreStateException(tcID,
- State.Ready, getState(), "add produced partition"));
- }
- return getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
- TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
- .setTxnidMostBits(txnID.getMostSigBits())
- .setTxnidLeastBits(txnID.getLeastSigBits())
- .setMetadataOp(TransactionMetadataOp.ADD_PARTITION)
- .addAllPartitions(partitions)
- .setLastModificationTime(System.currentTimeMillis())
- .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+ public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, List<String> partitions) {
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ internalPinnedExecutor.execute(() -> {
+ if (!checkIfReady()) {
+ completableFuture
+ .completeExceptionally(new CoordinatorException.TransactionMetadataStoreStateException(tcID,
+ State.Ready, getState(), "add produced partition"));
+ return;
+ }
+ getTxnPositionPair(txnID).thenAccept(txnMetaListPair -> {
+ TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
+ .setTxnidMostBits(txnID.getMostSigBits())
+ .setTxnidLeastBits(txnID.getLeastSigBits())
+ .setMetadataOp(TransactionMetadataOp.ADD_PARTITION)
+ .addAllPartitions(partitions)
+ .setLastModificationTime(System.currentTimeMillis())
+ .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
- return transactionLog.append(transactionMetadataEntry)
- .thenCompose(position -> {
- appendLogCount.increment();
- try {
- synchronized (txnMetaListPair.getLeft()) {
- txnMetaListPair.getLeft().addProducedPartitions(partitions);
- txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position);
+ transactionLog.append(transactionMetadataEntry)
+ .whenComplete((position, exception) -> {
+ if (exception != null) {
+ completableFuture.completeExceptionally(exception);
+ return;
}
- return CompletableFuture.completedFuture(null);
- } catch (InvalidTxnStatusException e) {
- transactionLog.deletePosition(Collections.singletonList(position));
- log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
- + " add produced partition error with TxnStatus : "
- + txnMetaListPair.getLeft().status().name(), e);
- return FutureUtil.failedFuture(e);
- }
- });
+ appendLogCount.increment();
+ try {
+ synchronized (txnMetaListPair.getLeft()) {
+ txnMetaListPair.getLeft().addProducedPartitions(partitions);
+ txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position);
+ }
+ completableFuture.complete(null);
+ } catch (InvalidTxnStatusException e) {
+ transactionLog.deletePosition(Collections.singletonList(position));
+ log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
+ + " add produced partition error with TxnStatus : "
+ + txnMetaListPair.getLeft().status().name(), e);
+ completableFuture.completeExceptionally(e);
+ }
+ });
+ });
});
+ return completableFuture;
}
@Override
- public synchronized CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID,
+ public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID,
List<TransactionSubscription> txnSubscriptions) {
- if (!checkIfReady()) {
- return FutureUtil.failedFuture(
- new CoordinatorException.TransactionMetadataStoreStateException(tcID,
- State.Ready, getState(), "add acked partition"));
- }
- return getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
- TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
- .setTxnidMostBits(txnID.getMostSigBits())
- .setTxnidLeastBits(txnID.getLeastSigBits())
- .setMetadataOp(TransactionMetadataOp.ADD_SUBSCRIPTION)
- .addAllSubscriptions(txnSubscriptionToSubscription(txnSubscriptions))
- .setLastModificationTime(System.currentTimeMillis())
- .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ internalPinnedExecutor.execute(() -> {
+ if (!checkIfReady()) {
+ completableFuture.completeExceptionally(new CoordinatorException
+ .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "add acked partition"));
+ return;
+ }
+ getTxnPositionPair(txnID).thenAccept(txnMetaListPair -> {
+ TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
+ .setTxnidMostBits(txnID.getMostSigBits())
+ .setTxnidLeastBits(txnID.getLeastSigBits())
+ .setMetadataOp(TransactionMetadataOp.ADD_SUBSCRIPTION)
+ .addAllSubscriptions(txnSubscriptionToSubscription(txnSubscriptions))
+ .setLastModificationTime(System.currentTimeMillis())
+ .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
- return transactionLog.append(transactionMetadataEntry)
- .thenCompose(position -> {
- appendLogCount.increment();
- try {
- synchronized (txnMetaListPair.getLeft()) {
- txnMetaListPair.getLeft().addAckedPartitions(txnSubscriptions);
- txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position);
+ transactionLog.append(transactionMetadataEntry)
+ .whenComplete((position, exception) -> {
+ if (exception != null) {
+ completableFuture.completeExceptionally(exception);
+ return;
}
- return CompletableFuture.completedFuture(null);
- } catch (InvalidTxnStatusException e) {
- transactionLog.deletePosition(Collections.singletonList(position));
- log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
- + " add acked subscription error with TxnStatus : "
- + txnMetaListPair.getLeft().status().name(), e);
- return FutureUtil.failedFuture(e);
- }
- });
+ appendLogCount.increment();
+ try {
+ synchronized (txnMetaListPair.getLeft()) {
+ txnMetaListPair.getLeft().addAckedPartitions(txnSubscriptions);
+ txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position);
+ }
+ completableFuture.complete(null);
+ } catch (InvalidTxnStatusException e) {
+ transactionLog.deletePosition(Collections.singletonList(position));
+ log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
+ + " add acked subscription error with TxnStatus : "
+ + txnMetaListPair.getLeft().status().name(), e);
+ completableFuture.completeExceptionally(e);
+ }
+ });
+ });
});
+ return completableFuture;
}
@Override
- public synchronized CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus newStatus,
+ public CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus newStatus,
TxnStatus expectedStatus, boolean isTimeout) {
- if (!checkIfReady()) {
- return FutureUtil.failedFuture(
- new CoordinatorException.TransactionMetadataStoreStateException(tcID,
- State.Ready, getState(), "update transaction status"));
- }
- return getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
- if (txnMetaListPair.getLeft().status() == newStatus) {
- return CompletableFuture.completedFuture(null);
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ internalPinnedExecutor.execute(() -> {
+ if (!checkIfReady()) {
+ completableFuture.completeExceptionally(new CoordinatorException
+ .TransactionMetadataStoreStateException(tcID,
+ State.Ready, getState(), "update transaction status"));
+ return;
}
- TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
- .setTxnidMostBits(txnID.getMostSigBits())
- .setTxnidLeastBits(txnID.getLeastSigBits())
- .setExpectedStatus(expectedStatus)
- .setMetadataOp(TransactionMetadataOp.UPDATE)
- .setLastModificationTime(System.currentTimeMillis())
- .setNewStatus(newStatus)
- .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+ getTxnPositionPair(txnID).thenAccept(txnMetaListPair -> {
+ if (txnMetaListPair.getLeft().status() == newStatus) {
+ completableFuture.complete(null);
+ return;
+ }
+ TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
+ .setTxnidMostBits(txnID.getMostSigBits())
+ .setTxnidLeastBits(txnID.getLeastSigBits())
+ .setExpectedStatus(expectedStatus)
+ .setMetadataOp(TransactionMetadataOp.UPDATE)
+ .setLastModificationTime(System.currentTimeMillis())
+ .setNewStatus(newStatus)
+ .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
- return transactionLog.append(transactionMetadataEntry).thenCompose(position -> {
- appendLogCount.increment();
- try {
- synchronized (txnMetaListPair.getLeft()) {
- txnMetaListPair.getLeft().updateTxnStatus(newStatus, expectedStatus);
- txnMetaListPair.getRight().add(position);
+ transactionLog.append(transactionMetadataEntry).whenComplete((position, throwable) -> {
+ if (throwable != null) {
+ completableFuture.completeExceptionally(throwable);
+ return;
}
- if (newStatus == TxnStatus.ABORTING && isTimeout) {
- this.transactionTimeoutCount.increment();
- }
- if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
- return transactionLog.deletePosition(txnMetaListPair.getRight()).thenCompose(v -> {
- this.transactionMetadataStoreStats
- .addTransactionExecutionLatencySample(System.currentTimeMillis()
- - txnMetaListPair.getLeft().getOpenTimestamp());
- if (newStatus == TxnStatus.COMMITTED) {
- committedTransactionCount.increment();
- } else {
- abortedTransactionCount.increment();
- }
- txnMetaMap.remove(txnID.getLeastSigBits());
- return CompletableFuture.completedFuture(null);
- });
+ appendLogCount.increment();
+ try {
+ synchronized (txnMetaListPair.getLeft()) {
+ txnMetaListPair.getLeft().updateTxnStatus(newStatus, expectedStatus);
+ txnMetaListPair.getRight().add(position);
+ }
+ if (newStatus == TxnStatus.ABORTING && isTimeout) {
+ this.transactionTimeoutCount.increment();
+ }
+ if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
+ transactionLog.deletePosition(txnMetaListPair.getRight()).whenComplete((v, exception) -> {
+ if (exception != null) {
+ completableFuture.completeExceptionally(exception);
+ return;
+ }
+ this.transactionMetadataStoreStats
+ .addTransactionExecutionLatencySample(System.currentTimeMillis()
+ - txnMetaListPair.getLeft().getOpenTimestamp());
+ if (newStatus == TxnStatus.COMMITTED) {
+ committedTransactionCount.increment();
+ } else {
+ abortedTransactionCount.increment();
+ }
+ txnMetaMap.remove(txnID.getLeastSigBits());
+ completableFuture.complete(null);
+ });
+ }
+ completableFuture.complete(null);
+ } catch (InvalidTxnStatusException e) {
+ transactionLog.deletePosition(Collections.singletonList(position));
+ log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
+ + " add update txn status error with TxnStatus : "
+ + txnMetaListPair.getLeft().status().name(), e);
+ completableFuture.completeExceptionally(e);
}
- return CompletableFuture.completedFuture(null);
- } catch (InvalidTxnStatusException e) {
- transactionLog.deletePosition(Collections.singletonList(position));
- log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
- + " add update txn status error with TxnStatus : "
- + txnMetaListPair.getLeft().status().name(), e);
- return FutureUtil.failedFuture(e);
- }
+ });
});
});
+ return completableFuture;
}
@Override