You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rg...@apache.org on 2022/03/11 04:59:53 UTC
[pulsar] branch branch-2.9 updated: [Tests] Fix thread leak in MLTransactionMetadataStore (#14524)
This is an automated email from the ASF dual-hosted git repository.
rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new c3e65ea [Tests] Fix thread leak in MLTransactionMetadataStore (#14524)
c3e65ea is described below
commit c3e65ea55af813571722d082126ecb295cac4355
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Mar 2 14:21:41 2022 +0200
[Tests] Fix thread leak in MLTransactionMetadataStore (#14524)
- MLTransactionMetadataStore.internalPinnedExecutor wasn't closed
when MLTransactionMetadataStore.closeAsync was called
- problem was introduced by #14238 changes
- this issue causes tests to fail with OOME. Most likely this also impacts
production code.
* Close TransactionMetadataStoreService after the broker service has been closed
(cherry picked from commit 0ddec86444bbdd94221032a3128f7dbe79934b93)
---
.../org/apache/pulsar/broker/PulsarService.java | 16 +++++++-----
.../broker/TransactionMetadataStoreService.java | 12 ++++++++-
.../coordinator/TransactionMetadataStoreState.java | 13 +++++++---
.../impl/MLTransactionMetadataStore.java | 29 +++++++++++++++-------
4 files changed, 50 insertions(+), 20 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 7fcf0d0..ba41784 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -403,11 +403,6 @@ public class PulsarService implements AutoCloseable, ShutdownService {
brokerAdditionalServlets = null;
}
- if (this.transactionMetadataStoreService != null) {
- this.transactionMetadataStoreService.close();
- this.transactionMetadataStoreService = null;
- }
-
GracefulExecutorServicesShutdown executorServicesShutdown =
GracefulExecutorServicesShutdown
.initiate()
@@ -426,7 +421,16 @@ public class PulsarService implements AutoCloseable, ShutdownService {
List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
if (this.brokerService != null) {
- asyncCloseFutures.add(this.brokerService.closeAsync());
+ CompletableFuture<Void> brokerCloseFuture = this.brokerService.closeAsync();
+ if (this.transactionMetadataStoreService != null) {
+ asyncCloseFutures.add(brokerCloseFuture.whenComplete((__, ___) -> {
+ // close transactionMetadataStoreService after the broker has been closed
+ this.transactionMetadataStoreService.close();
+ this.transactionMetadataStoreService = null;
+ }));
+ } else {
+ asyncCloseFutures.add(brokerCloseFuture);
+ }
this.brokerService = null;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index b3ced3c..28bef1f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -554,7 +554,17 @@ public class TransactionMetadataStoreService {
return Collections.unmodifiableMap(stores);
}
- public void close () {
+ public synchronized void close () {
this.internalPinnedExecutor.shutdown();
+ stores.forEach((tcId, metadataStore) -> {
+ metadataStore.closeAsync().whenComplete((v, ex) -> {
+ if (ex != null) {
+ LOG.error("Close transaction metadata store with id " + tcId, ex);
+ } else {
+ LOG.info("Removed and closed transaction meta store {}", tcId);
+ }
+ });
+ });
+ stores.clear();
}
}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java
index 5dbd9ba..8947413 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java
@@ -33,6 +33,7 @@ public abstract class TransactionMetadataStoreState {
None,
Initializing,
Ready,
+ Closing,
Close
}
@@ -55,10 +56,14 @@ public abstract class TransactionMetadataStoreState {
return STATE_UPDATER.compareAndSet(this, State.None, State.Initializing);
}
+ protected boolean changeToClosingState() {
+ return (STATE_UPDATER.compareAndSet(this, State.Ready, State.Closing)
+ || STATE_UPDATER.compareAndSet(this, State.None, State.Closing)
+ || STATE_UPDATER.compareAndSet(this, State.Initializing, State.Closing));
+ }
+
protected boolean changeToCloseState() {
- return (STATE_UPDATER.compareAndSet(this, State.Ready, State.Close)
- || STATE_UPDATER.compareAndSet(this, State.None, State.Close)
- || STATE_UPDATER.compareAndSet(this, State.Initializing, State.Close));
+ return STATE_UPDATER.compareAndSet(this, State.Closing, State.Close);
}
protected boolean checkIfReady() {
@@ -68,4 +73,4 @@ public abstract class TransactionMetadataStoreState {
public State getState() {
return STATE_UPDATER.get(this);
}
-}
\ No newline at end of file
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index eabb5fb..19d651c 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -18,7 +18,9 @@
*/
package org.apache.pulsar.transaction.coordinator.impl;
+import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.concurrent.DefaultThreadFactory;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -443,15 +445,24 @@ public class MLTransactionMetadataStore
@Override
public CompletableFuture<Void> closeAsync() {
- return transactionLog.closeAsync().thenCompose(v -> {
- txnMetaMap.clear();
- this.timeoutTracker.close();
- if (!this.changeToCloseState()) {
- return FutureUtil.failedFuture(
- new IllegalStateException("Managed ledger transaction metadata store state to close error!"));
- }
+ if (changeToClosingState()) {
+ // Disable new tasks from being submitted
+ internalPinnedExecutor.shutdown();
+ return transactionLog.closeAsync().thenCompose(v -> {
+ txnMetaMap.clear();
+ this.timeoutTracker.close();
+ if (!this.changeToCloseState()) {
+ return FutureUtil.failedFuture(
+ new IllegalStateException(
+ "Managed ledger transaction metadata store state to close error!"));
+ }
+ // Shutdown the ExecutorService
+ MoreExecutors.shutdownAndAwaitTermination(internalPinnedExecutor, Duration.ofSeconds(5L));
+ return CompletableFuture.completedFuture(null);
+ });
+ } else {
return CompletableFuture.completedFuture(null);
- });
+ }
}
@Override
@@ -505,4 +516,4 @@ public class MLTransactionMetadataStore
public ManagedLedger getManagedLedger() {
return this.transactionLog.getManagedLedger();
}
-}
\ No newline at end of file
+}