You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/04/26 20:17:59 UTC

(pulsar) branch master updated: [improve][broker] Don't use forkjoin pool by default for deleting partitioned topics (#22598)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8323a3c4991 [improve][broker] Don't use forkjoin pool by default for deleting partitioned topics (#22598)
8323a3c4991 is described below

commit 8323a3c49912976aee723787fa67bee4d7d8d846
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Apr 26 23:17:51 2024 +0300

    [improve][broker] Don't use forkjoin pool by default for deleting partitioned topics (#22598)
---
 .../apache/pulsar/broker/resources/NamespaceResources.java | 14 +++++++++++---
 .../apache/pulsar/broker/resources/PulsarResources.java    | 12 ++++++++++--
 .../main/java/org/apache/pulsar/broker/PulsarService.java  |  2 +-
 3 files changed, 22 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 1ba353dccaa..975b23192f9 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -54,10 +56,14 @@ public class NamespaceResources extends BaseResources<Policies> {
     private static final String NAMESPACE_BASE_PATH = "/namespace";
 
     public NamespaceResources(MetadataStore configurationStore, int operationTimeoutSec) {
+        this(configurationStore, operationTimeoutSec, ForkJoinPool.commonPool());
+    }
+
+    public NamespaceResources(MetadataStore configurationStore, int operationTimeoutSec, Executor executor) {
         super(configurationStore, Policies.class, operationTimeoutSec);
         this.configurationStore = configurationStore;
         isolationPolicies = new IsolationPolicyResources(configurationStore, operationTimeoutSec);
-        partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec);
+        partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec, executor);
     }
 
     public CompletableFuture<List<String>> listNamespacesAsync(String tenant) {
@@ -234,9 +240,11 @@ public class NamespaceResources extends BaseResources<Policies> {
 
     public static class PartitionedTopicResources extends BaseResources<PartitionedTopicMetadata> {
         private static final String PARTITIONED_TOPIC_PATH = "/admin/partitioned-topics";
+        private final Executor executor;
 
-        public PartitionedTopicResources(MetadataStore configurationStore, int operationTimeoutSec) {
+        public PartitionedTopicResources(MetadataStore configurationStore, int operationTimeoutSec, Executor executor) {
             super(configurationStore, PartitionedTopicMetadata.class, operationTimeoutSec);
+            this.executor = executor;
         }
 
         public CompletableFuture<Void> updatePartitionedTopicAsync(TopicName tn, Function<PartitionedTopicMetadata,
@@ -371,7 +379,7 @@ public class NamespaceResources extends BaseResources<Policies> {
                         future.complete(deleteResult);
                     }
                 });
-            });
+            }, executor);
 
             return future;
         }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index fe7ffe0bc7b..cc64eeb52f6 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.broker.resources;
 
 import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
 import lombok.Getter;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -57,13 +59,19 @@ public class PulsarResources {
     public PulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore) {
         this(localMetadataStore, configurationMetadataStore, DEFAULT_OPERATION_TIMEOUT_SEC);
     }
+
+    public PulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore,
+                           int operationTimeoutSec) {
+        this(localMetadataStore, configurationMetadataStore, operationTimeoutSec, ForkJoinPool.commonPool());
+    }
+
     public PulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore,
-            int operationTimeoutSec) {
+            int operationTimeoutSec, Executor executor) {
         if (configurationMetadataStore != null) {
             tenantResources = new TenantResources(configurationMetadataStore, operationTimeoutSec);
             clusterResources = new ClusterResources(localMetadataStore, configurationMetadataStore,
                     operationTimeoutSec);
-            namespaceResources = new NamespaceResources(configurationMetadataStore, operationTimeoutSec);
+            namespaceResources = new NamespaceResources(configurationMetadataStore, operationTimeoutSec, executor);
             resourcegroupResources = new ResourceGroupResources(configurationMetadataStore, operationTimeoutSec);
         } else {
             tenantResources = null;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 51dffc20d07..96f3653ea99 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1000,7 +1000,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
     @VisibleForTesting
     protected PulsarResources newPulsarResources() {
         PulsarResources pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore,
-                config.getMetadataStoreOperationTimeoutSeconds());
+                config.getMetadataStoreOperationTimeoutSeconds(), getExecutor());
 
         pulsarResources.getClusterResources().getStore().registerListener(this::handleDeleteCluster);
         return pulsarResources;