You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rg...@apache.org on 2022/03/11 04:59:53 UTC

[pulsar] branch branch-2.9 updated: [Tests] Fix thread leak in MLTransactionMetadataStore (#14524)

This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new c3e65ea  [Tests] Fix thread leak in MLTransactionMetadataStore (#14524)
c3e65ea is described below

commit c3e65ea55af813571722d082126ecb295cac4355
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Mar 2 14:21:41 2022 +0200

    [Tests] Fix thread leak in MLTransactionMetadataStore (#14524)
    
    - MLTransactionMetadataStore.internalPinnedExecutor wasn't closed
      when MLTransactionMetadataStore.closeAsync was called
      - problem was introduced by #14238 changes
    
    - this issue causes tests to fail with OOME. Most likely this also impacts
      production code.
    
    * Close TransactionMetadataStoreService after the broker service has been closed
    
    (cherry picked from commit 0ddec86444bbdd94221032a3128f7dbe79934b93)
---
 .../org/apache/pulsar/broker/PulsarService.java    | 16 +++++++-----
 .../broker/TransactionMetadataStoreService.java    | 12 ++++++++-
 .../coordinator/TransactionMetadataStoreState.java | 13 +++++++---
 .../impl/MLTransactionMetadataStore.java           | 29 +++++++++++++++-------
 4 files changed, 50 insertions(+), 20 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 7fcf0d0..ba41784 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -403,11 +403,6 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                 brokerAdditionalServlets = null;
             }
 
-            if (this.transactionMetadataStoreService != null) {
-                this.transactionMetadataStoreService.close();
-                this.transactionMetadataStoreService = null;
-            }
-
             GracefulExecutorServicesShutdown executorServicesShutdown =
                     GracefulExecutorServicesShutdown
                             .initiate()
@@ -426,7 +421,16 @@ public class PulsarService implements AutoCloseable, ShutdownService {
 
             List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
             if (this.brokerService != null) {
-                asyncCloseFutures.add(this.brokerService.closeAsync());
+                CompletableFuture<Void> brokerCloseFuture = this.brokerService.closeAsync();
+                if (this.transactionMetadataStoreService != null) {
+                    asyncCloseFutures.add(brokerCloseFuture.whenComplete((__, ___) -> {
+                        // close transactionMetadataStoreService after the broker has been closed
+                        this.transactionMetadataStoreService.close();
+                        this.transactionMetadataStoreService = null;
+                    }));
+                } else {
+                    asyncCloseFutures.add(brokerCloseFuture);
+                }
                 this.brokerService = null;
             }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index b3ced3c..28bef1f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -554,7 +554,17 @@ public class TransactionMetadataStoreService {
         return Collections.unmodifiableMap(stores);
     }
 
-    public void close () {
+    public synchronized void close () {
         this.internalPinnedExecutor.shutdown();
+        stores.forEach((tcId, metadataStore) -> {
+            metadataStore.closeAsync().whenComplete((v, ex) -> {
+                if (ex != null) {
+                    LOG.error("Close transaction metadata store with id " + tcId, ex);
+                } else {
+                    LOG.info("Removed and closed transaction meta store {}", tcId);
+                }
+            });
+        });
+        stores.clear();
     }
 }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java
index 5dbd9ba..8947413 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java
@@ -33,6 +33,7 @@ public abstract class TransactionMetadataStoreState {
         None,
         Initializing,
         Ready,
+        Closing,
         Close
     }
 
@@ -55,10 +56,14 @@ public abstract class TransactionMetadataStoreState {
         return STATE_UPDATER.compareAndSet(this, State.None, State.Initializing);
     }
 
+    protected boolean changeToClosingState() {
+        return (STATE_UPDATER.compareAndSet(this, State.Ready, State.Closing)
+                || STATE_UPDATER.compareAndSet(this, State.None, State.Closing)
+                || STATE_UPDATER.compareAndSet(this, State.Initializing, State.Closing));
+    }
+
     protected boolean changeToCloseState() {
-        return (STATE_UPDATER.compareAndSet(this, State.Ready, State.Close)
-                || STATE_UPDATER.compareAndSet(this, State.None, State.Close)
-                || STATE_UPDATER.compareAndSet(this, State.Initializing, State.Close));
+        return STATE_UPDATER.compareAndSet(this, State.Closing, State.Close);
     }
 
     protected boolean checkIfReady() {
@@ -68,4 +73,4 @@ public abstract class TransactionMetadataStoreState {
     public State getState() {
         return STATE_UPDATER.get(this);
     }
-}
\ No newline at end of file
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index eabb5fb..19d651c 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.transaction.coordinator.impl;
 
+import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -443,15 +445,24 @@ public class MLTransactionMetadataStore
 
     @Override
     public CompletableFuture<Void> closeAsync() {
-        return transactionLog.closeAsync().thenCompose(v -> {
-            txnMetaMap.clear();
-            this.timeoutTracker.close();
-            if (!this.changeToCloseState()) {
-                return FutureUtil.failedFuture(
-                        new IllegalStateException("Managed ledger transaction metadata store state to close error!"));
-            }
+        if (changeToClosingState()) {
+            // Disable new tasks from being submitted
+            internalPinnedExecutor.shutdown();
+            return transactionLog.closeAsync().thenCompose(v -> {
+                txnMetaMap.clear();
+                this.timeoutTracker.close();
+                if (!this.changeToCloseState()) {
+                    return FutureUtil.failedFuture(
+                            new IllegalStateException(
+                                    "Managed ledger transaction metadata store state to close error!"));
+                }
+                // Shutdown the ExecutorService
+                MoreExecutors.shutdownAndAwaitTermination(internalPinnedExecutor, Duration.ofSeconds(5L));
+                return CompletableFuture.completedFuture(null);
+            });
+        } else {
             return CompletableFuture.completedFuture(null);
-        });
+        }
     }
 
     @Override
@@ -505,4 +516,4 @@ public class MLTransactionMetadataStore
     public ManagedLedger getManagedLedger() {
         return this.transactionLog.getManagedLedger();
     }
-}
\ No newline at end of file
+}