You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/07 02:01:27 UTC
[pulsar] branch branch-2.7 updated: Issue #8533: Change method
`getWebServiceUrl` into async (#8746)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 0f18723 Issue #8533: Change method `getWebServiceUrl` into async (#8746)
0f18723 is described below
commit 0f18723f42b9b5c1b68752d708cc2b5efa9025a6
Author: Weijie Guo <re...@163.com>
AuthorDate: Fri Dec 11 12:59:09 2020 +0800
Issue #8533: Change method `getWebServiceUrl` into async (#8746)
fix #8533 , modified only issue-related method, may need to change all calls in this section to asynchronous later.
(cherry picked from commit 6a7ec2eec51c317af101e3da207add8e47b19151)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 101 +++++++++++----------
.../broker/admin/v1/NonPersistentTopics.java | 80 +++++++++-------
.../broker/admin/v2/NonPersistentTopics.java | 54 ++++++-----
.../pulsar/broker/namespace/NamespaceService.java | 30 ++++--
.../pulsar/broker/web/PulsarWebResource.java | 4 +-
.../common/naming/NamespaceBundleFactory.java | 4 +
.../apache/pulsar/broker/admin/NamespacesTest.java | 6 +-
7 files changed, 162 insertions(+), 117 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index df18b53..208902e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.pulsar.broker.admin.impl;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -637,7 +638,7 @@ public abstract class NamespacesBase extends AdminResource {
AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
if (null != authService) {
authService.grantPermissionAsync(namespaceName, actions, role, null/*additional auth-data json*/)
- .get();
+ .get();
} else {
throw new RestException(Status.NOT_IMPLEMENTED, "Authorization is not enabled");
}
@@ -1308,26 +1309,34 @@ public abstract class NamespacesBase extends AdminResource {
validatePoliciesReadOnlyAccess();
- if (!isBundleOwnedByAnyBroker(namespaceName, policies.bundles, bundleRange)) {
- log.info("[{}] Namespace bundle is not owned by any broker {}/{}", clientAppId(), namespaceName,
- bundleRange);
- asyncResponse.resume(Response.noContent().build());
- return;
- }
-
- NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
- authoritative, true);
-
- pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle)
- .thenRun(() -> {
- log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), nsBundle.toString());
- asyncResponse.resume(Response.noContent().build());
- }).exceptionally(ex -> {
- log.error("[{}] Failed to unload namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange,
- ex);
- asyncResponse.resume(new RestException(ex));
- return null;
- });
+ isBundleOwnedByAnyBroker(namespaceName, policies.bundles, bundleRange).thenAccept(flag -> {
+ if (!flag) {
+ log.info("[{}] Namespace bundle is not owned by any broker {}/{}", clientAppId(), namespaceName,
+ bundleRange);
+ asyncResponse.resume(Response.noContent().build());
+ return;
+ }
+ NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
+ authoritative, true);
+
+ pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle)
+ .thenRun(() -> {
+ log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), nsBundle.toString());
+ asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to unload namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange,
+ ex);
+ asyncResponse.resume(new RestException(ex));
+ return null;
+ });
+ }).exceptionally((ex) -> {
+ if (ex.getCause() instanceof WebApplicationException) {
+ asyncResponse.resume(ex.getCause());
+ } else {
+ asyncResponse.resume(new RestException(ex.getCause()));
+ }
+ return null;
+ });
}
@SuppressWarnings("deprecation")
@@ -1362,7 +1371,7 @@ public abstract class NamespacesBase extends AdminResource {
} catch (ExecutionException e) {
if (e.getCause() instanceof IllegalArgumentException) {
log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), namespaceName,
- bundleRange, e.getMessage());
+ bundleRange, e.getMessage());
throw new RestException(Status.PRECONDITION_FAILED, "Split bundle failed due to invalid request");
} else {
log.error("[{}] Failed to split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, e);
@@ -1548,7 +1557,7 @@ public abstract class NamespacesBase extends AdminResource {
namespaceName);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update the subscriptionDispatchRate for cluster on namespace {}: does not exist",
- clientAppId(), namespaceName);
+ clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
@@ -1558,7 +1567,7 @@ public abstract class NamespacesBase extends AdminResource {
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to update the subscriptionDispatchRate for cluster on namespace {}", clientAppId(),
- namespaceName, e);
+ namespaceName, e);
throw new RestException(e);
}
}
@@ -1648,7 +1657,7 @@ public abstract class NamespacesBase extends AdminResource {
namespaceName);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}: does not exist",
- clientAppId(), namespaceName);
+ clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
@@ -1658,7 +1667,7 @@ public abstract class NamespacesBase extends AdminResource {
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}", clientAppId(),
- namespaceName, e);
+ namespaceName, e);
throw new RestException(e);
}
}
@@ -1902,7 +1911,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalClearNamespaceBacklogForSubscription(AsyncResponse asyncResponse, String subscription,
- boolean authoritative) {
+ boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
checkNotNull(subscription, "Subscription should not be null");
@@ -1946,7 +1955,7 @@ public abstract class NamespacesBase extends AdminResource {
@SuppressWarnings("deprecation")
protected void internalClearNamespaceBundleBacklogForSubscription(String subscription, String bundleRange,
- boolean authoritative) {
+ boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
checkNotNull(subscription, "Subscription should not be null");
checkNotNull(bundleRange, "BundleRange should not be null");
@@ -1969,7 +1978,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse, String subscription,
- boolean authoritative) {
+ boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE);
checkNotNull(subscription, "Subscription should not be null");
@@ -2131,13 +2140,13 @@ public abstract class NamespacesBase extends AdminResource {
}
}
- protected void internalSetInactiveTopic(InactiveTopicPolicies inactiveTopicPolicies){
+ protected void internalSetInactiveTopic(InactiveTopicPolicies inactiveTopicPolicies) {
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();
internalSetPolicies("inactive_topic_policies", inactiveTopicPolicies);
}
- protected void internalSetPolicies(String fieldName, Object value){
+ protected void internalSetPolicies(String fieldName, Object value) {
try {
Stat nodeStat = new Stat();
final String path = path(POLICIES, namespaceName.toString());
@@ -2270,7 +2279,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected List<String> internalGetAntiAffinityNamespaces(String cluster, String antiAffinityGroup,
- String tenant) {
+ String tenant) {
validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ);
checkNotNull(cluster, "Cluster should not be null");
checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be null");
@@ -2758,11 +2767,11 @@ public abstract class NamespacesBase extends AdminResource {
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
log.info("[{}] Successfully updated compactionThreshold configuration: namespace={}, value={}",
- clientAppId(), namespaceName, policies.compaction_threshold);
+ clientAppId(), namespaceName, policies.compaction_threshold);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update compactionThreshold configuration for namespace {}: does not exist",
- clientAppId(), namespaceName);
+ clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn("[{}] Failed to update compactionThreshold configuration for namespace {}: concurrent modification",
@@ -2772,7 +2781,7 @@ public abstract class NamespacesBase extends AdminResource {
throw pfe;
} catch (Exception e) {
log.error("[{}] Failed to update compactionThreshold configuration for namespace {}",
- clientAppId(), namespaceName, e);
+ clientAppId(), namespaceName, e);
throw new RestException(e);
}
}
@@ -2806,21 +2815,21 @@ public abstract class NamespacesBase extends AdminResource {
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
log.info("[{}] Successfully updated offloadThreshold configuration: namespace={}, value={}",
- clientAppId(), namespaceName, policies.offload_threshold);
+ clientAppId(), namespaceName, policies.offload_threshold);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update offloadThreshold configuration for namespace {}: does not exist",
- clientAppId(), namespaceName);
+ clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn("[{}] Failed to update offloadThreshold configuration for namespace {}: concurrent modification",
- clientAppId(), namespaceName);
+ clientAppId(), namespaceName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
log.error("[{}] Failed to update offloadThreshold configuration for namespace {}",
- clientAppId(), namespaceName, e);
+ clientAppId(), namespaceName, e);
throw new RestException(e);
}
}
@@ -2854,11 +2863,11 @@ public abstract class NamespacesBase extends AdminResource {
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
log.info("[{}] Successfully updated offloadDeletionLagMs configuration: namespace={}, value={}",
- clientAppId(), namespaceName, policies.offload_deletion_lag_ms);
+ clientAppId(), namespaceName, policies.offload_deletion_lag_ms);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update offloadDeletionLagMs configuration for namespace {}: does not exist",
- clientAppId(), namespaceName);
+ clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn("[{}] Failed to update offloadDeletionLagMs configuration for namespace {}: concurrent modification",
@@ -2868,7 +2877,7 @@ public abstract class NamespacesBase extends AdminResource {
throw pfe;
} catch (Exception e) {
log.error("[{}] Failed to update offloadDeletionLag configuration for namespace {}",
- clientAppId(), namespaceName, e);
+ clientAppId(), namespaceName, e);
throw new RestException(e);
}
}
@@ -2958,21 +2967,21 @@ public abstract class NamespacesBase extends AdminResource {
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
log.info("[{}] Successfully updated {} configuration: namespace={}, value={}",
- clientAppId(), policyName, namespaceName, getter.apply(policies));
+ clientAppId(), policyName, namespaceName, getter.apply(policies));
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update {} configuration for namespace {}: does not exist",
- clientAppId(), policyName, namespaceName);
+ clientAppId(), policyName, namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn("[{}] Failed to update {} configuration for namespace {}: concurrent modification",
- clientAppId(), policyName, namespaceName);
+ clientAppId(), policyName, namespaceName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
log.error("[{}] Failed to update {} configuration for namespace {}",
- clientAppId(), policyName, namespaceName, e);
+ clientAppId(), policyName, namespaceName, e);
throw new RestException(e);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 179920b..bc3ddd1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.pulsar.broker.admin.v1;
import com.google.common.collect.Lists;
@@ -28,7 +29,7 @@ import io.swagger.annotations.ApiResponses;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-
+import java.util.concurrent.TimeUnit;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
@@ -73,12 +74,15 @@ public class NonPersistentTopics extends PersistentTopics {
@ApiOperation(hidden = true, value = "Get partitioned topic metadata.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
- @ApiResponse(code = 403, message = "Don't have admin permission") })
+ @ApiResponse(code = 403, message = "Don't have admin permission")})
public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("property") String property,
- @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
- @PathParam("topic") @Encoded String encodedTopic,
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
- @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("authoritative") @DefaultValue("false")
+ boolean authoritative,
+ @QueryParam("checkAllowAutoCreation") @DefaultValue("false")
+ boolean checkAllowAutoCreation) {
validateTopicName(property, cluster, namespace, encodedTopic);
return getPartitionedTopicMetadata(topicName, authoritative, checkAllowAutoCreation);
}
@@ -89,11 +93,12 @@ public class NonPersistentTopics extends PersistentTopics {
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
- @ApiResponse(code = 404, message = "Topic does not exist") })
+ @ApiResponse(code = 404, message = "Topic does not exist")})
public NonPersistentTopicStats getStats(@PathParam("property") String property,
- @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
- @PathParam("topic") @Encoded String encodedTopic,
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
validateAdminOperationOnTopic(authoritative);
Topic topic = getTopicReference(topicName);
@@ -106,12 +111,15 @@ public class NonPersistentTopics extends PersistentTopics {
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
- @ApiResponse(code = 404, message = "Topic does not exist") })
+ @ApiResponse(code = 404, message = "Topic does not exist")})
public PersistentTopicInternalStats getInternalStats(@PathParam("property") String property,
- @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
- @PathParam("topic") @Encoded String encodedTopic,
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
- @QueryParam("metadata") @DefaultValue("false") boolean metadata) {
+ @PathParam("cluster") String cluster, @PathParam("namespace")
+ String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("authoritative") @DefaultValue("false")
+ boolean authoritative,
+ @QueryParam("metadata") @DefaultValue("false")
+ boolean metadata) {
validateTopicName(property, cluster, namespace, encodedTopic);
validateAdminOperationOnTopic(authoritative);
Topic topic = getTopicReference(topicName);
@@ -125,13 +133,17 @@ public class NonPersistentTopics extends PersistentTopics {
@PUT
@Path("/{property}/{cluster}/{namespace}/{topic}/partitions")
- @ApiOperation(hidden = true, value = "Create a partitioned topic.", notes = "It needs to be called before creating a producer on a partitioned topic.")
- @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
- @ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"),
- @ApiResponse(code = 409, message = "Partitioned topic already exist") })
- public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
- int numPartitions) {
+ @ApiOperation(hidden = true, value = "Create a partitioned topic.",
+ notes = "It needs to be called before creating a producer on a partitioned topic.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal"
+ + " to maxNumPartitionsPerPartitionedTopic"),
+ @ApiResponse(code = 409, message = "Partitioned topic already exist")})
+ public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property, @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace, @PathParam("topic") @Encoded
+ String encodedTopic,
+ int numPartitions) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalCreatePartitionedTopic(asyncResponse, numPartitions);
@@ -147,11 +159,11 @@ public class NonPersistentTopics extends PersistentTopics {
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
- @ApiResponse(code = 404, message = "Topic does not exist") })
+ @ApiResponse(code = 404, message = "Topic does not exist")})
public void unloadTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
- @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
- @PathParam("topic") @Encoded String encodedTopic,
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+ @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalUnloadTopic(asyncResponse, authoritative);
@@ -170,7 +182,7 @@ public class NonPersistentTopics extends PersistentTopics {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist") })
public void getList(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
- @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
+ @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
log.info("[{}] list of topics on namespace {}/{}/{}", clientAppId(), property, cluster, namespace);
Policies policies = null;
@@ -250,13 +262,15 @@ public class NonPersistentTopics extends PersistentTopics {
validateGlobalNamespaceOwnership(NamespaceName.get(property, cluster, namespace));
}
NamespaceName fqnn = NamespaceName.get(property, cluster, namespace);
- if (!isBundleOwnedByAnyBroker(fqnn, policies.bundles, bundleRange)) {
- log.info("[{}] Namespace bundle is not owned by any broker {}/{}/{}/{}", clientAppId(), property, cluster,
- namespace, bundleRange);
- return null;
- }
- NamespaceBundle nsBundle = validateNamespaceBundleOwnership(fqnn, policies.bundles, bundleRange, true, true);
try {
+ if (!isBundleOwnedByAnyBroker(fqnn, policies.bundles, bundleRange)
+ .get(pulsar().getConfig().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS)) {
+ log.info("[{}] Namespace bundle is not owned by any broker {}/{}/{}/{}", clientAppId(), property,
+ cluster, namespace, bundleRange);
+ return null;
+ }
+ NamespaceBundle nsBundle = validateNamespaceBundleOwnership(fqnn, policies.bundles, bundleRange,
+ true, true);
final List<String> topicList = Lists.newArrayList();
pulsar().getBrokerService().forEachTopic(topic -> {
TopicName topicName = TopicName.get(topic.getName());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 6254311..59b9dad 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.pulsar.broker.admin.v2;
import com.google.common.collect.Lists;
@@ -43,6 +44,7 @@ import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.pulsar.broker.PulsarServerException;
@@ -287,9 +289,9 @@ public class NonPersistentTopics extends PersistentTopics {
}
final List<String> nonPersistentTopics =
- topics.stream()
- .filter(name -> !TopicName.get(name).isPersistent())
- .collect(Collectors.toList());
+ topics.stream()
+ .filter(name -> !TopicName.get(name).isPersistent())
+ .collect(Collectors.toList());
asyncResponse.resume(nonPersistentTopics);
return null;
});
@@ -306,7 +308,8 @@ public class NonPersistentTopics extends PersistentTopics {
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration"),
})
- public List<String> getListFromBundle(
+ public void getListFromBundle(
+ @Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@@ -324,29 +327,32 @@ public class NonPersistentTopics extends PersistentTopics {
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
validateGlobalNamespaceOwnership(namespaceName);
- if (!isBundleOwnedByAnyBroker(namespaceName, policies.bundles, bundleRange)) {
- log.info("[{}] Namespace bundle is not owned by any broker {}/{}", clientAppId(), namespaceName,
- bundleRange);
- return null;
- }
-
- NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, true, true);
- try {
- final List<String> topicList = Lists.newArrayList();
- pulsar().getBrokerService().forEachTopic(topic -> {
- TopicName topicName = TopicName.get(topic.getName());
- if (nsBundle.includes(topicName)) {
- topicList.add(topic.getName());
+ isBundleOwnedByAnyBroker(namespaceName, policies.bundles, bundleRange).thenAccept(flag -> {
+ if (!flag) {
+ log.info("[{}] Namespace bundle is not owned by any broker {}/{}", clientAppId(), namespaceName,
+ bundleRange);
+ asyncResponse.resume(Response.noContent().build());
+ } else {
+ NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles,
+ bundleRange, true, true);
+ try {
+ final List<String> topicList = Lists.newArrayList();
+ pulsar().getBrokerService().forEachTopic(topic -> {
+ TopicName topicName = TopicName.get(topic.getName());
+ if (nsBundle.includes(topicName)) {
+ topicList.add(topic.getName());
+ }
+ });
+ asyncResponse.resume(topicList);
+ } catch (Exception e) {
+ log.error("[{}] Failed to unload namespace bundle {}/{}", clientAppId(),
+ namespaceName, bundleRange, e);
+ asyncResponse.resume(new RestException(e));
}
- });
- return topicList;
- } catch (Exception e) {
- log.error("[{}] Failed to unload namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, e);
- throw new RestException(e);
- }
+ }
+ });
}
-
protected void validateAdminOperationOnTopic(TopicName topicName, boolean authoritative) {
validateAdminAccessForTenant(topicName.getTenant());
validateTopicOwnership(topicName, authoritative);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index a73085f..99b661e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -234,34 +234,46 @@ public class NamespaceService {
return bundleFactory.getFullBundle(fqnn);
}
+ private CompletableFuture<NamespaceBundle> getFullBundleAsync(NamespaceName fqnn) throws Exception {
+ return bundleFactory.getFullBundleAsync(fqnn);
+ }
+
/**
- * Return the URL of the broker who's owning a particular service unit.
+ * Return the URL of the broker who's owning a particular service unit in asynchronous way
*
- * If the service unit is not owned, return an empty optional
+ * If the service unit is not owned, return a CompletableFuture with empty optional
*/
- public Optional<URL> getWebServiceUrl(ServiceUnitId suName, LookupOptions options) throws Exception {
+ public CompletableFuture<Optional<URL>> getWebServiceUrlAsync(ServiceUnitId suName, LookupOptions options) throws Exception {
if (suName instanceof TopicName) {
TopicName name = (TopicName) suName;
if (LOG.isDebugEnabled()) {
LOG.debug("Getting web service URL of topic: {} - options: {}", name, options);
}
- return this.internalGetWebServiceUrl(getBundle(name), options)
- .get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
+ return getBundleAsync(name)
+ .thenCompose(namespaceBundle -> internalGetWebServiceUrl(namespaceBundle, options));
}
if (suName instanceof NamespaceName) {
- return this.internalGetWebServiceUrl(getFullBundle((NamespaceName) suName), options)
- .get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
+ return getFullBundleAsync((NamespaceName) suName)
+ .thenCompose(namespaceBundle -> internalGetWebServiceUrl(namespaceBundle, options));
}
if (suName instanceof NamespaceBundle) {
- return this.internalGetWebServiceUrl((NamespaceBundle) suName, options)
- .get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
+ return internalGetWebServiceUrl((NamespaceBundle) suName, options);
}
throw new IllegalArgumentException("Unrecognized class of NamespaceBundle: " + suName.getClass().getName());
}
+ /**
+ * Return the URL of the broker who's owning a particular service unit.
+ *
+ * If the service unit is not owned, return an empty optional
+ */
+ public Optional<URL> getWebServiceUrl(ServiceUnitId suName, LookupOptions options) throws Exception {
+ return getWebServiceUrlAsync(suName, options).get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
+ }
+
private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(NamespaceBundle bundle, LookupOptions options) {
return findBrokerServiceUrl(bundle, options).thenApply(lookupResult -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 7478c9d..677d81e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -501,7 +501,7 @@ public abstract class PulsarWebResource {
/**
* Checks whether a given bundle is currently loaded by any broker
*/
- protected boolean isBundleOwnedByAnyBroker(NamespaceName fqnn, BundlesData bundles,
+ protected CompletableFuture<Boolean> isBundleOwnedByAnyBroker(NamespaceName fqnn, BundlesData bundles,
String bundleRange) {
NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, bundles, bundleRange);
NamespaceService nsService = pulsar().getNamespaceService();
@@ -512,7 +512,7 @@ public abstract class PulsarWebResource {
.readOnly(true)
.loadTopicsInBundle(false).build();
try {
- return nsService.getWebServiceUrl(nsBundle, options).isPresent();
+ return nsService.getWebServiceUrlAsync(nsBundle, options).thenApply(optionUrl -> optionUrl.isPresent());
} catch (Exception e) {
log.error("[{}] Failed to check whether namespace bundle is owned {}/{}", clientAppId(), fqnn.toString(), bundleRange, e);
throw new RestException(e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
index 1de6212..0e0b031 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
@@ -181,6 +181,10 @@ public class NamespaceBundleFactory implements ZooKeeperCacheListener<LocalPolic
return bundlesCache.synchronous().get(fqnn).getFullBundle();
}
+ public CompletableFuture<NamespaceBundle> getFullBundleAsync(NamespaceName fqnn) throws Exception {
+ return bundlesCache.get(fqnn).thenApply(NamespaceBundles::getFullBundle);
+ }
+
public long getLongHashCode(String name) {
return this.hashFunc.hashString(name, Charsets.UTF_8).padToLong();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index b4a0ae8..d51911a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -909,8 +909,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
createBundledTestNamespaces(this.testTenant, this.testLocalCluster, bundledNsLocal, bundleData);
final NamespaceName testNs = NamespaceName.get(this.testTenant, this.testLocalCluster, bundledNsLocal);
- doReturn(Optional.of(localWebServiceUrl)).when(nsSvc)
- .getWebServiceUrl(Mockito.argThat(bundle -> bundle.getNamespaceObject().equals(testNs)),
+ doReturn(CompletableFuture.completedFuture(Optional.of(localWebServiceUrl))).when(nsSvc)
+ .getWebServiceUrlAsync(Mockito.argThat(bundle -> bundle.getNamespaceObject().equals(testNs)),
Mockito.any());
doReturn(true).when(nsSvc)
.isServiceUnitOwned(Mockito.argThat(bundle -> bundle.getNamespaceObject().equals(testNs)));
@@ -919,7 +919,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
NamespaceBundle testBundle = nsBundles.getBundles().get(0);
// make one bundle owned
LookupOptions optionsHttps = LookupOptions.builder().authoritative(false).requestHttps(true).readOnly(false).build();
- doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testBundle, optionsHttps);
+ doReturn(CompletableFuture.completedFuture(Optional.of(localWebServiceUrl))).when(nsSvc).getWebServiceUrlAsync(testBundle, optionsHttps);
doReturn(true).when(nsSvc).isServiceUnitOwned(testBundle);
doReturn(CompletableFuture.completedFuture(null)).when(nsSvc).unloadNamespaceBundle(testBundle);
AsyncResponse response = mock(AsyncResponse.class);