You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/04/26 20:17:59 UTC
(pulsar) branch master updated: [improve][broker] Don't use forkjoin pool by default for deleting partitioned topics (#22598)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8323a3c4991 [improve][broker] Don't use forkjoin pool by default for deleting partitioned topics (#22598)
8323a3c4991 is described below
commit 8323a3c49912976aee723787fa67bee4d7d8d846
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Apr 26 23:17:51 2024 +0300
[improve][broker] Don't use forkjoin pool by default for deleting partitioned topics (#22598)
---
.../apache/pulsar/broker/resources/NamespaceResources.java | 14 +++++++++++---
.../apache/pulsar/broker/resources/PulsarResources.java | 12 ++++++++++--
.../main/java/org/apache/pulsar/broker/PulsarService.java | 2 +-
3 files changed, 22 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 1ba353dccaa..975b23192f9 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -24,6 +24,8 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -54,10 +56,14 @@ public class NamespaceResources extends BaseResources<Policies> {
private static final String NAMESPACE_BASE_PATH = "/namespace";
public NamespaceResources(MetadataStore configurationStore, int operationTimeoutSec) {
+ this(configurationStore, operationTimeoutSec, ForkJoinPool.commonPool());
+ }
+
+ public NamespaceResources(MetadataStore configurationStore, int operationTimeoutSec, Executor executor) {
super(configurationStore, Policies.class, operationTimeoutSec);
this.configurationStore = configurationStore;
isolationPolicies = new IsolationPolicyResources(configurationStore, operationTimeoutSec);
- partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec);
+ partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec, executor);
}
public CompletableFuture<List<String>> listNamespacesAsync(String tenant) {
@@ -234,9 +240,11 @@ public class NamespaceResources extends BaseResources<Policies> {
public static class PartitionedTopicResources extends BaseResources<PartitionedTopicMetadata> {
private static final String PARTITIONED_TOPIC_PATH = "/admin/partitioned-topics";
+ private final Executor executor;
- public PartitionedTopicResources(MetadataStore configurationStore, int operationTimeoutSec) {
+ public PartitionedTopicResources(MetadataStore configurationStore, int operationTimeoutSec, Executor executor) {
super(configurationStore, PartitionedTopicMetadata.class, operationTimeoutSec);
+ this.executor = executor;
}
public CompletableFuture<Void> updatePartitionedTopicAsync(TopicName tn, Function<PartitionedTopicMetadata,
@@ -371,7 +379,7 @@ public class NamespaceResources extends BaseResources<Policies> {
future.complete(deleteResult);
}
});
- });
+ }, executor);
return future;
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index fe7ffe0bc7b..cc64eeb52f6 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.broker.resources;
import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
import lombok.Getter;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -57,13 +59,19 @@ public class PulsarResources {
public PulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore) {
this(localMetadataStore, configurationMetadataStore, DEFAULT_OPERATION_TIMEOUT_SEC);
}
+
+ public PulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore,
+ int operationTimeoutSec) {
+ this(localMetadataStore, configurationMetadataStore, operationTimeoutSec, ForkJoinPool.commonPool());
+ }
+
public PulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore,
- int operationTimeoutSec) {
+ int operationTimeoutSec, Executor executor) {
if (configurationMetadataStore != null) {
tenantResources = new TenantResources(configurationMetadataStore, operationTimeoutSec);
clusterResources = new ClusterResources(localMetadataStore, configurationMetadataStore,
operationTimeoutSec);
- namespaceResources = new NamespaceResources(configurationMetadataStore, operationTimeoutSec);
+ namespaceResources = new NamespaceResources(configurationMetadataStore, operationTimeoutSec, executor);
resourcegroupResources = new ResourceGroupResources(configurationMetadataStore, operationTimeoutSec);
} else {
tenantResources = null;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 51dffc20d07..96f3653ea99 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1000,7 +1000,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
@VisibleForTesting
protected PulsarResources newPulsarResources() {
PulsarResources pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore,
- config.getMetadataStoreOperationTimeoutSeconds());
+ config.getMetadataStoreOperationTimeoutSeconds(), getExecutor());
pulsarResources.getClusterResources().getStore().registerListener(this::handleDeleteCluster);
return pulsarResources;