You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/06 08:48:49 UTC

[pulsar] branch master updated: [improve][broker] Make PulsarWebResource#getOwnerFromPeerClusterList async. (#15940)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a6c6b6576b [improve][broker] Make PulsarWebResource#getOwnerFromPeerClusterList async. (#15940)
0a6c6b6576b is described below

commit 0a6c6b6576bbae104e5b464b9a4898fc991569b1
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Jun 6 16:48:40 2022 +0800

    [improve][broker] Make PulsarWebResource#getOwnerFromPeerClusterList async. (#15940)
---
 .../pulsar/broker/web/PulsarWebResource.java       | 82 ++++++++++++----------
 1 file changed, 45 insertions(+), 37 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 22449e7f119..62f01d906f3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -838,19 +838,26 @@ public abstract class PulsarWebResource {
                     log.warn(msg);
                     validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, msg));
                 } else if (!policies.replication_clusters.contains(localCluster)) {
-                    ClusterDataImpl ownerPeerCluster = getOwnerFromPeerClusterList(pulsarService,
-                            policies.replication_clusters);
-                    if (ownerPeerCluster != null) {
-                        // found a peer that own this namespace
-                        validationFuture.complete(ownerPeerCluster);
-                        return;
-                    }
-                    String msg = String.format(
-                            "Namespace missing local cluster name in clusters list: local_cluster=%s ns=%s clusters=%s",
-                            localCluster, namespace.toString(), policies.replication_clusters);
-
-                    log.warn(msg);
-                    validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, msg));
+                    getOwnerFromPeerClusterListAsync(pulsarService, policies.replication_clusters)
+                            .thenAccept(ownerPeerCluster -> {
+                                if (ownerPeerCluster != null) {
+                                    // found a peer that own this namespace
+                                    validationFuture.complete(ownerPeerCluster);
+                                } else {
+                                    String msg = String.format(
+                                            "Namespace missing local cluster name in clusters list: local_cluster=%s"
+                                                    + " ns=%s clusters=%s",
+                                            localCluster, namespace.toString(), policies.replication_clusters);
+                                    log.warn(msg);
+                                    validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED,
+                                            msg));
+                                }
+                            })
+                            .exceptionally(ex -> {
+                                Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                                validationFuture.completeExceptionally(new RestException(cause));
+                                return null;
+                            });
                 } else {
                     validationFuture.complete(null);
                 }
@@ -869,34 +876,35 @@ public abstract class PulsarWebResource {
         return validationFuture;
     }
 
-    private static ClusterDataImpl getOwnerFromPeerClusterList(PulsarService pulsar, Set<String> replicationClusters) {
+    private static CompletableFuture<ClusterDataImpl> getOwnerFromPeerClusterListAsync(PulsarService pulsar,
+            Set<String> replicationClusters) {
         String currentCluster = pulsar.getConfiguration().getClusterName();
         if (replicationClusters == null || replicationClusters.isEmpty() || isBlank(currentCluster)) {
-            return null;
+            return CompletableFuture.completedFuture(null);
         }
 
-        try {
-            Optional<ClusterData> cluster =
-                    pulsar.getPulsarResources().getClusterResources().getCluster(currentCluster);
-            if (!cluster.isPresent() || cluster.get().getPeerClusterNames() == null) {
-                return null;
-            }
-            for (String peerCluster : cluster.get().getPeerClusterNames()) {
-                if (replicationClusters.contains(peerCluster)) {
-                    return (ClusterDataImpl) pulsar.getPulsarResources().getClusterResources().getCluster(peerCluster)
-                            .orElseThrow(() -> new RestException(Status.NOT_FOUND,
-                                    "Peer cluster " + peerCluster + " data not found"));
-                }
-            }
-        } catch (Exception e) {
-            log.error("Failed to get peer-cluster {}-{}", currentCluster, e.getMessage());
-            if (e instanceof RestException) {
-                throw (RestException) e;
-            } else {
-                throw new RestException(e);
-            }
-        }
-        return null;
+        return pulsar.getPulsarResources().getClusterResources().getClusterAsync(currentCluster)
+                .thenCompose(cluster -> {
+                    if (!cluster.isPresent() || cluster.get().getPeerClusterNames() == null) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    for (String peerCluster : cluster.get().getPeerClusterNames()) {
+                        if (replicationClusters.contains(peerCluster)) {
+                            return pulsar.getPulsarResources().getClusterResources().getClusterAsync(peerCluster)
+                                    .thenApply(ret -> {
+                                        if (!ret.isPresent()) {
+                                            throw new RestException(Status.NOT_FOUND,
+                                                    "Peer cluster " + peerCluster + " data not found");
+                                        }
+                                        return (ClusterDataImpl) ret.get();
+                                    });
+                        }
+                    }
+                    return CompletableFuture.completedFuture(null);
+                }).exceptionally(ex -> {
+                    log.error("Failed to get peer-cluster {}-{}", currentCluster, ex.getMessage());
+                    throw FutureUtil.wrapToCompletionException(ex);
+                });
     }
 
     protected static void checkAuthorization(PulsarService pulsarService, TopicName topicName, String role,