You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/12/20 02:14:12 UTC

[pulsar] branch branch-2.10 updated: [fix][broker] Branch-2.10 Avoid endless blocking call. (#18914)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 969b73b6607 [fix][broker] Branch-2.10 Avoid endless blocking call. (#18914)
969b73b6607 is described below

commit 969b73b6607128288daa40547d6fc3b78f8cee9c
Author: Qiang Zhao <ma...@apache.org>
AuthorDate: Tue Dec 20 10:14:02 2022 +0800

    [fix][broker] Branch-2.10 Avoid endless blocking call. (#18914)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 73 +++++-----------------
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  2 +-
 .../pulsar/broker/admin/AdminResourceTest.java     |  4 +-
 3 files changed, 19 insertions(+), 60 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index aa873f6433d..305266ce799 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -26,9 +26,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import javax.servlet.ServletContext;
 import javax.ws.rs.WebApplicationException;
@@ -130,11 +127,7 @@ public abstract class AdminResource extends PulsarWebResource {
      */
 
     public void validatePoliciesReadOnlyAccess() {
-        try {
-            validatePoliciesReadOnlyAccessAsync().join();
-        } catch (CompletionException ce) {
-            throw new RestException(ce.getCause());
-        }
+        sync(this::validatePoliciesReadOnlyAccessAsync);
     }
 
     public CompletableFuture<Void> validatePoliciesReadOnlyAccessAsync() {
@@ -251,17 +244,11 @@ public abstract class AdminResource extends PulsarWebResource {
         }
     }
 
-    protected void validatePartitionedTopicMetadata(String tenant, String namespace, String encodedTopic) {
-        try {
-            PartitionedTopicMetadata partitionedTopicMetadata =
-                    pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).get();
-            if (partitionedTopicMetadata.partitions < 1) {
-                throw new RestException(Status.CONFLICT, "Topic is not partitioned topic");
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            log.error("Failed to validate partitioned topic metadata {}://{}/{}/{}",
-                    domain(), tenant, namespace, topicName, e);
-            throw new RestException(Status.INTERNAL_SERVER_ERROR, "Check topic partition meta failed.");
+    protected void validatePartitionedTopicMetadata() {
+        PartitionedTopicMetadata partitionedTopicMetadata = sync(()->
+                pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName));
+        if (partitionedTopicMetadata.partitions < 1) {
+            throw new RestException(Status.CONFLICT, "Topic is not partitioned topic");
         }
     }
 
@@ -465,28 +452,14 @@ public abstract class AdminResource extends PulsarWebResource {
                 });
     }
 
-    protected static PartitionedTopicMetadata fetchPartitionedTopicMetadata(PulsarService pulsar, TopicName topicName) {
-        try {
-            return pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).get();
-        } catch (Exception e) {
-            if (e.getCause() instanceof RestException) {
-                throw (RestException) e.getCause();
-            }
-            throw new RestException(e);
-        }
+    protected PartitionedTopicMetadata fetchPartitionedTopicMetadata(PulsarService pulsar, TopicName topicName) {
+        return sync(() -> pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName));
     }
 
-    protected static PartitionedTopicMetadata fetchPartitionedTopicMetadataCheckAllowAutoCreation(
+    protected PartitionedTopicMetadata fetchPartitionedTopicMetadataCheckAllowAutoCreation(
             PulsarService pulsar, TopicName topicName) {
-        try {
-            return pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName)
-                    .get();
-        } catch (Exception e) {
-            if (e.getCause() instanceof RestException) {
-                throw (RestException) e.getCause();
-            }
-            throw new RestException(e);
-        }
+        return sync(() -> pulsar.getBrokerService()
+                .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName));
     }
 
    protected void validateClusterExists(String cluster) {
@@ -523,26 +496,12 @@ public abstract class AdminResource extends PulsarWebResource {
     }
 
     protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
-        try {
-            return namespaceResources().getPartitionedTopicResources()
-                    .listPartitionedTopicsAsync(namespaceName, topicDomain)
-                    .join();
-        } catch (Exception e) {
-            log.error("[{}] Failed to get partitioned topic list for namespace {}", clientAppId(),
-                    namespaceName.toString(), e);
-            throw new RestException(e);
-        }
+        return sync(() -> namespaceResources().getPartitionedTopicResources()
+                .listPartitionedTopicsAsync(namespaceName, topicDomain));
     }
 
-    protected List<String> getTopicPartitionList(TopicDomain topicDomain) {
-        try {
-            return getPulsarResources().getTopicResources().getExistingPartitions(topicName)
-                    .get(config().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
-        } catch (Exception e) {
-            log.error("[{}] Failed to get topic partition list for namespace {}", clientAppId(),
-                    namespaceName.toString(), e);
-            throw new RestException(e);
-        }
+    protected List<String> getTopicPartitionList() {
+        return sync(()-> getPulsarResources().getTopicResources().getExistingPartitions(topicName));
     }
 
     protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions,
@@ -572,7 +531,7 @@ public abstract class AdminResource extends PulsarWebResource {
 
             // new create check
             if (maxTopicsPerNamespace > 0 && !pulsar().getBrokerService().isSystemTopic(topicName)) {
-                List<String> partitionedTopics = getTopicPartitionList(TopicDomain.persistent);
+                List<String> partitionedTopics = getTopicPartitionList();
                 // exclude created system topic
                 long topicsCount =
                         partitionedTopics.stream().filter(t ->
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index a607db8b6aa..afc5575abc2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -810,7 +810,7 @@ public class PersistentTopics extends PersistentTopicsBase {
                     required = true, type = "int", defaultValue = "0")
                     int numPartitions) {
         validatePartitionedTopicName(tenant, namespace, encodedTopic);
-        validatePartitionedTopicMetadata(tenant, namespace, encodedTopic);
+        validatePartitionedTopicMetadata();
         internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative, force);
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
index 2e468f960b1..0217ef1b4b1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
@@ -135,11 +135,11 @@ public class AdminResourceTest extends BrokerTestBase {
         resource.setPulsar(pulsar);
         // validate should pass when topic is partitioned topic
         resource.validatePartitionedTopicName(tenant, namespace, Codec.encode(partitionedTopic));
-        resource.validatePartitionedTopicMetadata(tenant, namespace, Codec.encode(partitionedTopic));
+        resource.validatePartitionedTopicMetadata();
         // validate should failed when topic is non-partitioned topic
         resource.validatePartitionedTopicName(tenant, namespace, Codec.encode(nonPartitionedTopic));
         try {
-            resource.validatePartitionedTopicMetadata(tenant, namespace, Codec.encode(nonPartitionedTopic));
+            resource.validatePartitionedTopicMetadata();
             fail("Should fail validation on non-partitioned topic");
         } catch (RestException re) {
             assertEquals(Status.CONFLICT.getStatusCode(), re.getResponse().getStatus());