You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/28 15:13:06 UTC
[pulsar] 01/29: [improve][broker] Make PulsarWebResource#getOwnerFromPeerClusterList async. (#15940)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4a02d47e269e9d005db1197cb707704fb0ea80db
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Jun 6 16:48:40 2022 +0800
[improve][broker] Make PulsarWebResource#getOwnerFromPeerClusterList async. (#15940)
(cherry picked from commit 0a6c6b6576bbae104e5b464b9a4898fc991569b1)
---
.../pulsar/broker/web/PulsarWebResource.java | 82 ++++++++++++----------
1 file changed, 45 insertions(+), 37 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index d810de85bf4..0c07ba7a091 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -830,19 +830,26 @@ public abstract class PulsarWebResource {
log.warn(msg);
validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, msg));
} else if (!policies.replication_clusters.contains(localCluster)) {
- ClusterDataImpl ownerPeerCluster = getOwnerFromPeerClusterList(pulsarService,
- policies.replication_clusters);
- if (ownerPeerCluster != null) {
- // found a peer that own this namespace
- validationFuture.complete(ownerPeerCluster);
- return;
- }
- String msg = String.format(
- "Namespace missing local cluster name in clusters list: local_cluster=%s ns=%s clusters=%s",
- localCluster, namespace.toString(), policies.replication_clusters);
-
- log.warn(msg);
- validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, msg));
+ getOwnerFromPeerClusterListAsync(pulsarService, policies.replication_clusters)
+ .thenAccept(ownerPeerCluster -> {
+ if (ownerPeerCluster != null) {
+ // found a peer that own this namespace
+ validationFuture.complete(ownerPeerCluster);
+ } else {
+ String msg = String.format(
+ "Namespace missing local cluster name in clusters list: local_cluster=%s"
+ + " ns=%s clusters=%s",
+ localCluster, namespace.toString(), policies.replication_clusters);
+ log.warn(msg);
+ validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED,
+ msg));
+ }
+ })
+ .exceptionally(ex -> {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ validationFuture.completeExceptionally(new RestException(cause));
+ return null;
+ });
} else {
validationFuture.complete(null);
}
@@ -861,34 +868,35 @@ public abstract class PulsarWebResource {
return validationFuture;
}
- private static ClusterDataImpl getOwnerFromPeerClusterList(PulsarService pulsar, Set<String> replicationClusters) {
+ private static CompletableFuture<ClusterDataImpl> getOwnerFromPeerClusterListAsync(PulsarService pulsar,
+ Set<String> replicationClusters) {
String currentCluster = pulsar.getConfiguration().getClusterName();
if (replicationClusters == null || replicationClusters.isEmpty() || isBlank(currentCluster)) {
- return null;
+ return CompletableFuture.completedFuture(null);
}
- try {
- Optional<ClusterData> cluster =
- pulsar.getPulsarResources().getClusterResources().getCluster(currentCluster);
- if (!cluster.isPresent() || cluster.get().getPeerClusterNames() == null) {
- return null;
- }
- for (String peerCluster : cluster.get().getPeerClusterNames()) {
- if (replicationClusters.contains(peerCluster)) {
- return (ClusterDataImpl) pulsar.getPulsarResources().getClusterResources().getCluster(peerCluster)
- .orElseThrow(() -> new RestException(Status.NOT_FOUND,
- "Peer cluster " + peerCluster + " data not found"));
- }
- }
- } catch (Exception e) {
- log.error("Failed to get peer-cluster {}-{}", currentCluster, e.getMessage());
- if (e instanceof RestException) {
- throw (RestException) e;
- } else {
- throw new RestException(e);
- }
- }
- return null;
+ return pulsar.getPulsarResources().getClusterResources().getClusterAsync(currentCluster)
+ .thenCompose(cluster -> {
+ if (!cluster.isPresent() || cluster.get().getPeerClusterNames() == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ for (String peerCluster : cluster.get().getPeerClusterNames()) {
+ if (replicationClusters.contains(peerCluster)) {
+ return pulsar.getPulsarResources().getClusterResources().getClusterAsync(peerCluster)
+ .thenApply(ret -> {
+ if (!ret.isPresent()) {
+ throw new RestException(Status.NOT_FOUND,
+ "Peer cluster " + peerCluster + " data not found");
+ }
+ return (ClusterDataImpl) ret.get();
+ });
+ }
+ }
+ return CompletableFuture.completedFuture(null);
+ }).exceptionally(ex -> {
+ log.error("Failed to get peer-cluster {}-{}", currentCluster, ex.getMessage());
+ throw FutureUtil.wrapToCompletionException(ex);
+ });
}
protected static CompletableFuture<Void> checkAuthorizationAsync(PulsarService pulsarService, TopicName topicName,