You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/12/20 02:14:12 UTC
[pulsar] branch branch-2.10 updated: [fix][broker] Branch-2.10 Avoid endless blocking call. (#18914)
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 969b73b6607 [fix][broker] Branch-2.10 Avoid endless blocking call. (#18914)
969b73b6607 is described below
commit 969b73b6607128288daa40547d6fc3b78f8cee9c
Author: Qiang Zhao <ma...@apache.org>
AuthorDate: Tue Dec 20 10:14:02 2022 +0800
[fix][broker] Branch-2.10 Avoid endless blocking call. (#18914)
---
.../apache/pulsar/broker/admin/AdminResource.java | 73 +++++-----------------
.../pulsar/broker/admin/v2/PersistentTopics.java | 2 +-
.../pulsar/broker/admin/AdminResourceTest.java | 4 +-
3 files changed, 19 insertions(+), 60 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index aa873f6433d..305266ce799 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -26,9 +26,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.servlet.ServletContext;
import javax.ws.rs.WebApplicationException;
@@ -130,11 +127,7 @@ public abstract class AdminResource extends PulsarWebResource {
*/
public void validatePoliciesReadOnlyAccess() {
- try {
- validatePoliciesReadOnlyAccessAsync().join();
- } catch (CompletionException ce) {
- throw new RestException(ce.getCause());
- }
+ sync(this::validatePoliciesReadOnlyAccessAsync);
}
public CompletableFuture<Void> validatePoliciesReadOnlyAccessAsync() {
@@ -251,17 +244,11 @@ public abstract class AdminResource extends PulsarWebResource {
}
}
- protected void validatePartitionedTopicMetadata(String tenant, String namespace, String encodedTopic) {
- try {
- PartitionedTopicMetadata partitionedTopicMetadata =
- pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).get();
- if (partitionedTopicMetadata.partitions < 1) {
- throw new RestException(Status.CONFLICT, "Topic is not partitioned topic");
- }
- } catch (InterruptedException | ExecutionException e) {
- log.error("Failed to validate partitioned topic metadata {}://{}/{}/{}",
- domain(), tenant, namespace, topicName, e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR, "Check topic partition meta failed.");
+ protected void validatePartitionedTopicMetadata() {
+ PartitionedTopicMetadata partitionedTopicMetadata = sync(()->
+ pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName));
+ if (partitionedTopicMetadata.partitions < 1) {
+ throw new RestException(Status.CONFLICT, "Topic is not partitioned topic");
}
}
@@ -465,28 +452,14 @@ public abstract class AdminResource extends PulsarWebResource {
});
}
- protected static PartitionedTopicMetadata fetchPartitionedTopicMetadata(PulsarService pulsar, TopicName topicName) {
- try {
- return pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).get();
- } catch (Exception e) {
- if (e.getCause() instanceof RestException) {
- throw (RestException) e.getCause();
- }
- throw new RestException(e);
- }
+ protected PartitionedTopicMetadata fetchPartitionedTopicMetadata(PulsarService pulsar, TopicName topicName) {
+ return sync(() -> pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName));
}
- protected static PartitionedTopicMetadata fetchPartitionedTopicMetadataCheckAllowAutoCreation(
+ protected PartitionedTopicMetadata fetchPartitionedTopicMetadataCheckAllowAutoCreation(
PulsarService pulsar, TopicName topicName) {
- try {
- return pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName)
- .get();
- } catch (Exception e) {
- if (e.getCause() instanceof RestException) {
- throw (RestException) e.getCause();
- }
- throw new RestException(e);
- }
+ return sync(() -> pulsar.getBrokerService()
+ .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName));
}
protected void validateClusterExists(String cluster) {
@@ -523,26 +496,12 @@ public abstract class AdminResource extends PulsarWebResource {
}
protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
- try {
- return namespaceResources().getPartitionedTopicResources()
- .listPartitionedTopicsAsync(namespaceName, topicDomain)
- .join();
- } catch (Exception e) {
- log.error("[{}] Failed to get partitioned topic list for namespace {}", clientAppId(),
- namespaceName.toString(), e);
- throw new RestException(e);
- }
+ return sync(() -> namespaceResources().getPartitionedTopicResources()
+ .listPartitionedTopicsAsync(namespaceName, topicDomain));
}
- protected List<String> getTopicPartitionList(TopicDomain topicDomain) {
- try {
- return getPulsarResources().getTopicResources().getExistingPartitions(topicName)
- .get(config().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
- } catch (Exception e) {
- log.error("[{}] Failed to get topic partition list for namespace {}", clientAppId(),
- namespaceName.toString(), e);
- throw new RestException(e);
- }
+ protected List<String> getTopicPartitionList() {
+ return sync(()-> getPulsarResources().getTopicResources().getExistingPartitions(topicName));
}
protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions,
@@ -572,7 +531,7 @@ public abstract class AdminResource extends PulsarWebResource {
// new create check
if (maxTopicsPerNamespace > 0 && !pulsar().getBrokerService().isSystemTopic(topicName)) {
- List<String> partitionedTopics = getTopicPartitionList(TopicDomain.persistent);
+ List<String> partitionedTopics = getTopicPartitionList();
// exclude created system topic
long topicsCount =
partitionedTopics.stream().filter(t ->
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index a607db8b6aa..afc5575abc2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -810,7 +810,7 @@ public class PersistentTopics extends PersistentTopicsBase {
required = true, type = "int", defaultValue = "0")
int numPartitions) {
validatePartitionedTopicName(tenant, namespace, encodedTopic);
- validatePartitionedTopicMetadata(tenant, namespace, encodedTopic);
+ validatePartitionedTopicMetadata();
internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative, force);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
index 2e468f960b1..0217ef1b4b1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
@@ -135,11 +135,11 @@ public class AdminResourceTest extends BrokerTestBase {
resource.setPulsar(pulsar);
// validate should pass when topic is partitioned topic
resource.validatePartitionedTopicName(tenant, namespace, Codec.encode(partitionedTopic));
- resource.validatePartitionedTopicMetadata(tenant, namespace, Codec.encode(partitionedTopic));
+ resource.validatePartitionedTopicMetadata();
// validate should failed when topic is non-partitioned topic
resource.validatePartitionedTopicName(tenant, namespace, Codec.encode(nonPartitionedTopic));
try {
- resource.validatePartitionedTopicMetadata(tenant, namespace, Codec.encode(nonPartitionedTopic));
+ resource.validatePartitionedTopicMetadata();
fail("Should fail validation on non-partitioned topic");
} catch (RestException re) {
assertEquals(Status.CONFLICT.getStatusCode(), re.getResponse().getStatus());