You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/28 15:13:06 UTC

[pulsar] 01/29: [improve][broker] Make PulsarWebResource#getOwnerFromPeerClusterList async. (#15940)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4a02d47e269e9d005db1197cb707704fb0ea80db
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Jun 6 16:48:40 2022 +0800

    [improve][broker] Make PulsarWebResource#getOwnerFromPeerClusterList async. (#15940)
    
    (cherry picked from commit 0a6c6b6576bbae104e5b464b9a4898fc991569b1)
---
 .../pulsar/broker/web/PulsarWebResource.java       | 82 ++++++++++++----------
 1 file changed, 45 insertions(+), 37 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index d810de85bf4..0c07ba7a091 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -830,19 +830,26 @@ public abstract class PulsarWebResource {
                     log.warn(msg);
                     validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, msg));
                 } else if (!policies.replication_clusters.contains(localCluster)) {
-                    ClusterDataImpl ownerPeerCluster = getOwnerFromPeerClusterList(pulsarService,
-                            policies.replication_clusters);
-                    if (ownerPeerCluster != null) {
-                        // found a peer that own this namespace
-                        validationFuture.complete(ownerPeerCluster);
-                        return;
-                    }
-                    String msg = String.format(
-                            "Namespace missing local cluster name in clusters list: local_cluster=%s ns=%s clusters=%s",
-                            localCluster, namespace.toString(), policies.replication_clusters);
-
-                    log.warn(msg);
-                    validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, msg));
+                    getOwnerFromPeerClusterListAsync(pulsarService, policies.replication_clusters)
+                            .thenAccept(ownerPeerCluster -> {
+                                if (ownerPeerCluster != null) {
+                                    // found a peer that own this namespace
+                                    validationFuture.complete(ownerPeerCluster);
+                                } else {
+                                    String msg = String.format(
+                                            "Namespace missing local cluster name in clusters list: local_cluster=%s"
+                                                    + " ns=%s clusters=%s",
+                                            localCluster, namespace.toString(), policies.replication_clusters);
+                                    log.warn(msg);
+                                    validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED,
+                                            msg));
+                                }
+                            })
+                            .exceptionally(ex -> {
+                                Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                                validationFuture.completeExceptionally(new RestException(cause));
+                                return null;
+                            });
                 } else {
                     validationFuture.complete(null);
                 }
@@ -861,34 +868,35 @@ public abstract class PulsarWebResource {
         return validationFuture;
     }
 
-    private static ClusterDataImpl getOwnerFromPeerClusterList(PulsarService pulsar, Set<String> replicationClusters) {
+    private static CompletableFuture<ClusterDataImpl> getOwnerFromPeerClusterListAsync(PulsarService pulsar,
+            Set<String> replicationClusters) {
         String currentCluster = pulsar.getConfiguration().getClusterName();
         if (replicationClusters == null || replicationClusters.isEmpty() || isBlank(currentCluster)) {
-            return null;
+            return CompletableFuture.completedFuture(null);
         }
 
-        try {
-            Optional<ClusterData> cluster =
-                    pulsar.getPulsarResources().getClusterResources().getCluster(currentCluster);
-            if (!cluster.isPresent() || cluster.get().getPeerClusterNames() == null) {
-                return null;
-            }
-            for (String peerCluster : cluster.get().getPeerClusterNames()) {
-                if (replicationClusters.contains(peerCluster)) {
-                    return (ClusterDataImpl) pulsar.getPulsarResources().getClusterResources().getCluster(peerCluster)
-                            .orElseThrow(() -> new RestException(Status.NOT_FOUND,
-                                    "Peer cluster " + peerCluster + " data not found"));
-                }
-            }
-        } catch (Exception e) {
-            log.error("Failed to get peer-cluster {}-{}", currentCluster, e.getMessage());
-            if (e instanceof RestException) {
-                throw (RestException) e;
-            } else {
-                throw new RestException(e);
-            }
-        }
-        return null;
+        return pulsar.getPulsarResources().getClusterResources().getClusterAsync(currentCluster)
+                .thenCompose(cluster -> {
+                    if (!cluster.isPresent() || cluster.get().getPeerClusterNames() == null) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    for (String peerCluster : cluster.get().getPeerClusterNames()) {
+                        if (replicationClusters.contains(peerCluster)) {
+                            return pulsar.getPulsarResources().getClusterResources().getClusterAsync(peerCluster)
+                                    .thenApply(ret -> {
+                                        if (!ret.isPresent()) {
+                                            throw new RestException(Status.NOT_FOUND,
+                                                    "Peer cluster " + peerCluster + " data not found");
+                                        }
+                                        return (ClusterDataImpl) ret.get();
+                                    });
+                        }
+                    }
+                    return CompletableFuture.completedFuture(null);
+                }).exceptionally(ex -> {
+                    log.error("Failed to get peer-cluster {}-{}", currentCluster, ex.getMessage());
+                    throw FutureUtil.wrapToCompletionException(ex);
+                });
     }
 
     protected static CompletableFuture<Void> checkAuthorizationAsync(PulsarService pulsarService, TopicName topicName,