You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2022/07/25 09:15:38 UTC
[pulsar] branch master updated: [improve][broker] Make splitNamespaceBundle and getTopicHashPositions async (#16411)
This is an automated email from the ASF dual-hosted git repository.
zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f47c705e3cc [improve][broker] Make splitNamespaceBundle and getTopicHashPositions async (#16411)
f47c705e3cc is described below
commit f47c705e3cc1cb0c34e48ad1ca71060f1202c70a
Author: gaozhangmin <zh...@apache.org>
AuthorDate: Mon Jul 25 17:15:25 2022 +0800
[improve][broker] Make splitNamespaceBundle and getTopicHashPositions async (#16411)
* make splitNamespaceBundle and getTopicHashPositions async
* appli comments
* Update pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
Co-authored-by: Zixuan Liu <no...@gmail.com>
* Update pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
Co-authored-by: Zixuan Liu <no...@gmail.com>
* Update pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
Co-authored-by: Zixuan Liu <no...@gmail.com>
* Update pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
Co-authored-by: Zixuan Liu <no...@gmail.com>
* Update pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
Co-authored-by: Zixuan Liu <no...@gmail.com>
* Update pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
Co-authored-by: Zixuan Liu <no...@gmail.com>
* fix error
* fix error
* fix check style error
Co-authored-by: gavingaozhangmin <ga...@didiglobal.com>
Co-authored-by: Zixuan Liu <no...@gmail.com>
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 189 ++++++++++-----------
.../apache/pulsar/broker/admin/v1/Namespaces.java | 107 +++++++++---
.../apache/pulsar/broker/admin/v2/Namespaces.java | 65 ++++---
.../apache/pulsar/broker/admin/NamespacesTest.java | 9 +-
4 files changed, 214 insertions(+), 156 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index f5da9ff6aa3..5b069edf75b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1115,116 +1115,99 @@ public abstract class NamespacesBase extends AdminResource {
}
@SuppressWarnings("deprecation")
- protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String bundleName, boolean authoritative,
- boolean unload, String splitAlgorithmName, List<Long> splitBoundaries) {
- validateSuperUserAccess();
- checkNotNull(bundleName, "BundleRange should not be null");
- log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleName);
-
- String bundleRange = getBundleRange(bundleName);
-
- Policies policies = getNamespacePolicies(namespaceName);
-
- if (namespaceName.isGlobal()) {
- // check cluster ownership for a given global namespace: redirect if peer-cluster owns it
- validateGlobalNamespaceOwnership(namespaceName);
- } else {
- validateClusterOwnership(namespaceName.getCluster());
- validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster());
- }
-
- validatePoliciesReadOnlyAccess();
-
- List<String> supportedNamespaceBundleSplitAlgorithms =
- pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
- if (StringUtils.isNotBlank(splitAlgorithmName)) {
- if (!supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
- asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
- "Unsupported namespace bundle split algorithm, supported algorithms are "
- + supportedNamespaceBundleSplitAlgorithms));
- }
- if (splitAlgorithmName.equalsIgnoreCase(NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE)
- && (splitBoundaries == null || splitBoundaries.size() == 0)) {
- asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
- "With specified_positions_divide split algorithm, splitBoundaries must not be emtpy"));
- }
- }
-
- NamespaceBundle nsBundle;
- try {
- nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
- authoritative, false);
- } catch (Exception e) {
- asyncResponse.resume(e);
- return;
- }
+ protected CompletableFuture<Void> internalSplitNamespaceBundleAsync(String bundleName,
+ boolean authoritative, boolean unload,
+ String splitAlgorithmName,
+ List<Long> splitBoundaries) {
+ return validateSuperUserAccessAsync()
+ .thenAccept(__ -> {
+ checkNotNull(bundleName, "BundleRange should not be null");
+ log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleName);
+ List<String> supportedNamespaceBundleSplitAlgorithms =
+ pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
+ if (StringUtils.isNotBlank(splitAlgorithmName)) {
+ if (!supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Unsupported namespace bundle split algorithm, supported algorithms are "
+ + supportedNamespaceBundleSplitAlgorithms);
+ }
+ if (splitAlgorithmName
+ .equalsIgnoreCase(NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE)
+ && (splitBoundaries == null || splitBoundaries.size() == 0)) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "With specified_positions_divide split algorithm, splitBoundaries must not be "
+ + "emtpy");
+ }
+ }
+ })
+ .thenCompose(__ -> {
+ if (namespaceName.isGlobal()) {
+ // check cluster ownership for a given global namespace: redirect if peer-cluster owns it
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ return validateClusterOwnershipAsync(namespaceName.getCluster())
+ .thenCompose(ignore -> validateClusterForTenantAsync(namespaceName.getTenant(),
+ namespaceName.getCluster()));
+ }
+ })
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenCompose(policies->{
+ String bundleRange = getBundleRange(bundleName);
+ return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
+ authoritative, false)
+ .thenCompose(nsBundle -> pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
+ getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries));
- pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
- getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries)
- .thenRun(() -> {
- log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString());
- asyncResponse.resume(Response.noContent().build());
- }).exceptionally(ex -> {
- if (ex.getCause() instanceof IllegalArgumentException) {
- log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), namespaceName,
- bundleRange, ex.getMessage());
- asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
- "Split bundle failed due to invalid request"));
- } else {
- log.error("[{}] Failed to split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, ex);
- asyncResponse.resume(new RestException(ex.getCause()));
- }
- return null;
- });
+ });
}
- protected void internalGetTopicHashPositions(AsyncResponse asyncResponse, String bundleRange, List<String> topics) {
+ protected CompletableFuture<TopicHashPositions> internalGetTopicHashPositionsAsync(String bundleRange,
+ List<String> topics) {
if (log.isDebugEnabled()) {
log.debug("[{}] Getting hash position for topic list {}, bundle {}", clientAppId(), topics, bundleRange);
}
- validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ);
- Policies policies = getNamespacePolicies(namespaceName);
- NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
- false, true);
- pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle).whenComplete(
- (allTopicsInThisBundle, throwable) -> {
- if (throwable != null) {
- log.error("[{}] {} Failed to get topic list for bundle {}.", clientAppId(),
- namespaceName, bundle);
- asyncResponse.resume(new RestException(throwable));
- }
- // if topics is empty, return all topics' hash position in this bundle
- Map<String, Long> topicHashPositions = new HashMap<>();
- if (topics == null || topics.size() == 0) {
- allTopicsInThisBundle.forEach(t -> {
- topicHashPositions.put(t,
- pulsar().getNamespaceService().getNamespaceBundleFactory()
- .getLongHashCode(t));
- });
- } else {
- for (String topic : topics.stream().map(Codec::decode).collect(Collectors.toList())) {
- TopicName topicName = TopicName.get(topic);
- // partitioned topic
- if (topicName.getPartitionIndex() == -1) {
- allTopicsInThisBundle.stream()
- .filter(t -> TopicName.get(t).getPartitionedTopicName()
- .equals(TopicName.get(topic).getPartitionedTopicName()))
- .forEach(partition -> {
- topicHashPositions.put(partition,
- pulsar().getNamespaceService().getNamespaceBundleFactory()
- .getLongHashCode(partition));
- });
- } else { // topic partition
- if (allTopicsInThisBundle.contains(topicName.toString())) {
- topicHashPositions.put(topic,
- pulsar().getNamespaceService().getNamespaceBundleFactory()
- .getLongHashCode(topic));
+ return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenCompose(policies -> {
+ return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
+ false, true)
+ .thenCompose(nsBundle ->
+ pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(nsBundle))
+ .thenApply(allTopicsInThisBundle -> {
+ Map<String, Long> topicHashPositions = new HashMap<>();
+ if (topics == null || topics.size() == 0) {
+ allTopicsInThisBundle.forEach(t -> {
+ topicHashPositions.put(t,
+ pulsar().getNamespaceService().getNamespaceBundleFactory()
+ .getLongHashCode(t));
+ });
+ } else {
+ for (String topic : topics.stream().map(Codec::decode).toList()) {
+ TopicName topicName = TopicName.get(topic);
+ // partitioned topic
+ if (topicName.getPartitionIndex() == -1) {
+ allTopicsInThisBundle.stream()
+ .filter(t -> TopicName.get(t).getPartitionedTopicName()
+ .equals(TopicName.get(topic).getPartitionedTopicName()))
+ .forEach(partition -> {
+ topicHashPositions.put(partition,
+ pulsar().getNamespaceService()
+ .getNamespaceBundleFactory()
+ .getLongHashCode(partition));
+ });
+ } else { // topic partition
+ if (allTopicsInThisBundle.contains(topicName.toString())) {
+ topicHashPositions.put(topic,
+ pulsar().getNamespaceService().getNamespaceBundleFactory()
+ .getLongHashCode(topic));
+ }
+ }
+ }
}
- }
- }
- }
- asyncResponse.resume(
- new TopicHashPositions(namespaceName.toString(), bundleRange, topicHashPositions));
+ return new TopicHashPositions(namespaceName.toString(), bundleRange,
+ topicHashPositions);
+ });
});
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 4264e5669d0..d3f1fb9a9e4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -809,15 +809,22 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Namespace is not setup to split in bundles") })
- public BundlesData getBundlesData(@PathParam("property") String property, @PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace) {
- validatePoliciesReadOnlyAccess();
- validateNamespaceName(property, cluster, namespace);
- validateNamespaceOperation(NamespaceName.get(property, namespace), NamespaceOperation.GET_BUNDLE);
-
- Policies policies = getNamespacePolicies(namespaceName);
-
- return policies.bundles;
+ public void getBundlesData(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(property, cluster, namespace);
+ validatePoliciesReadOnlyAccessAsync()
+ .thenCompose(__ -> validateNamespaceOperationAsync(NamespaceName.get(property, namespace),
+ NamespaceOperation.GET_BUNDLE))
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies -> asyncResponse.resume(policies.bundles))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get bundle data for namespace {} ", clientAppId(),
+ namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@PUT
@@ -899,15 +906,27 @@ public class Namespaces extends NamespacesBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("unload") @DefaultValue("false") boolean unload,
@QueryParam("splitBoundaries") @DefaultValue("") List<Long> splitBoundaries) {
- try {
- validateNamespaceName(property, cluster, namespace);
- internalSplitNamespaceBundle(asyncResponse, bundleRange,
- authoritative, unload, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME, splitBoundaries);
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateNamespaceName(property, cluster, namespace);
+ internalSplitNamespaceBundleAsync(bundleRange,
+ authoritative, unload, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME, splitBoundaries)
+ .thenAccept(__ -> {
+ log.info("[{}] Successfully split namespace bundle {}", clientAppId(), bundleRange);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to split namespace bundle {}/{}",
+ clientAppId(), namespaceName, bundleRange, ex);
+ }
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof IllegalArgumentException) {
+ asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED,
+ "Split bundle failed due to invalid request"));
+ } else {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
+ return null;
+ });
}
@GET
@@ -923,17 +942,32 @@ public class Namespaces extends NamespacesBase {
@QueryParam("topics") List<String> topics,
@Suspended AsyncResponse asyncResponse) {
validateNamespaceName(property, cluster, namespace);
- internalGetTopicHashPositions(asyncResponse, bundle, topics);
+ internalGetTopicHashPositionsAsync(bundle, topics)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] {} Failed to get topic list for bundle {}.", clientAppId(),
+ namespaceName, bundle);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@Path("/{property}/{cluster}/{namespace}/publishRate")
@ApiOperation(hidden = true, value = "Set publish-rate throttling for all topics of the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
- public void setPublishRate(@PathParam("property") String property, @PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace, PublishRate publishRate) {
+ public void setPublishRate(@Suspended AsyncResponse asyncResponse,
+ @PathParam("property") String property, @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace, PublishRate publishRate) {
validateNamespaceName(property, cluster, namespace);
- internalSetPublishRate(publishRate);
+ internalSetPublishRateAsync(publishRate)
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
@@ -943,20 +977,37 @@ public class Namespaces extends NamespacesBase {
+ "-1 means msg-publish-rate or byte-publish-rate not configured in publish-rate yet")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist")})
- public PublishRate getPublishRate(@PathParam("property") String property, @PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace) {
+ public void getPublishRate(@Suspended AsyncResponse asyncResponse,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
- return internalGetPublishRate();
+ internalGetPublishRateAsync()
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ log.error("Failed to get publish rate for namespace {}", namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@Path("/{property}/{cluster}/{namespace}/dispatchRate")
@ApiOperation(hidden = true, value = "Set dispatch-rate throttling for all topics of the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
- public void setDispatchRate(@PathParam("property") String property, @PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace, DispatchRateImpl dispatchRate) {
+ public void setDispatchRate(@Suspended AsyncResponse asyncResponse,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace, DispatchRateImpl dispatchRate) {
validateNamespaceName(property, cluster, namespace);
- internalSetTopicDispatchRate(dispatchRate);
+ internalSetTopicDispatchRateAsync(dispatchRate)
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to update the dispatchRate for cluster on namespace {}", clientAppId(),
+ namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index aa3698e1c7d..6d8fe5bc1d1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -58,7 +58,6 @@ import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
-import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
@@ -735,15 +734,21 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Namespace is not setup to split in bundles") })
- public BundlesData getBundlesData(@PathParam("tenant") String tenant,
- @PathParam("namespace") String namespace) {
- validatePoliciesReadOnlyAccess();
+ public void getBundlesData(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- validateNamespaceOperation(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_BUNDLE);
-
- Policies policies = getNamespacePolicies(namespaceName);
-
- return policies.bundles;
+ validatePoliciesReadOnlyAccessAsync()
+ .thenCompose(__ -> validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace),
+ NamespaceOperation.GET_BUNDLE))
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies -> asyncResponse.resume(policies.bundles))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get bundle data for namespace {}", clientAppId(),
+ namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@PUT
@@ -826,15 +831,26 @@ public class Namespaces extends NamespacesBase {
@QueryParam("unload") @DefaultValue("false") boolean unload,
@QueryParam("splitAlgorithmName") String splitAlgorithmName,
@ApiParam("splitBoundaries") List<Long> splitBoundaries) {
- try {
- validateNamespaceName(tenant, namespace);
- internalSplitNamespaceBundle(asyncResponse,
- bundleRange, authoritative, unload, splitAlgorithmName, splitBoundaries);
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateNamespaceName(tenant, namespace);
+ internalSplitNamespaceBundleAsync(bundleRange, authoritative, unload, splitAlgorithmName, splitBoundaries)
+ .thenAccept(__ -> {
+ log.info("[{}] Successfully split namespace bundle {}", clientAppId(), bundleRange);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to split namespace bundle {}/{} due to {}",
+ clientAppId(), namespaceName, bundleRange, ex.getMessage());
+ }
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof IllegalArgumentException) {
+ asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED,
+ "Split bundle failed due to invalid request"));
+ } else {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
+ return null;
+ });
}
@GET
@@ -849,8 +865,17 @@ public class Namespaces extends NamespacesBase {
@PathParam("bundle") String bundleRange,
@QueryParam("topics") List<String> topics,
@Suspended AsyncResponse asyncResponse) {
- validateNamespaceName(tenant, namespace);
- internalGetTopicHashPositions(asyncResponse, bundleRange, topics);
+ validateNamespaceName(tenant, namespace);
+ internalGetTopicHashPositionsAsync(bundleRange, topics)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] {} Failed to get topic list for bundle {}.", clientAppId(),
+ namespaceName, bundleRange);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 11a16d4d17f..2369a0af4bb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -644,10 +644,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
.numBundles(boundaries.size() - 1)
.build();
createBundledTestNamespaces(this.testTenant, this.testLocalCluster, "test-bundled-namespace-1", bundle);
- BundlesData responseData = namespaces.getBundlesData(testTenant, this.testLocalCluster,
- "test-bundled-namespace-1");
-
- assertEquals(responseData, bundle);
+ assertEquals(asyncRequests(ctx -> namespaces.getBundlesData(ctx, testTenant, this.testLocalCluster,
+ "test-bundled-namespace-1")), bundle);
}
@Test
@@ -917,7 +915,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
ArgumentCaptor<Response> captor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(captor.capture());
// verify split bundles
- BundlesData bundlesData = namespaces.getBundlesData(testTenant, testLocalCluster, bundledNsLocal);
+ BundlesData bundlesData = (BundlesData) asyncRequests(ctx -> namespaces.getBundlesData(ctx, testTenant,
+ testLocalCluster, bundledNsLocal));
assertNotNull(bundlesData);
assertEquals(bundlesData.getBoundaries().size(), 3);
assertEquals(bundlesData.getBoundaries().get(0), "0x00000000");