You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/03/21 03:28:17 UTC

[pulsar] branch master updated: [PulsarAdmin] Lookup to async (#6569)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ddf1cb9  [PulsarAdmin] Lookup to async  (#6569)
ddf1cb9 is described below

commit ddf1cb9d964916e8e4ab5ce7fb84c665228ce23d
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Sat Mar 21 11:28:00 2020 +0800

    [PulsarAdmin] Lookup to async  (#6569)
---
 .../org/apache/pulsar/client/admin/Lookup.java     | 26 +++++--
 .../pulsar/client/admin/internal/LookupImpl.java   | 84 +++++++++++++++++-----
 2 files changed, 87 insertions(+), 23 deletions(-)

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Lookup.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Lookup.java
index 10d244b..f84ea88 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Lookup.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Lookup.java
@@ -18,25 +18,43 @@
  */
 package org.apache.pulsar.client.admin;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * This is an interface class to allow using command line tool to quickly lookup the broker serving the topic.
  */
 public interface Lookup {
 
     /**
-     * Lookup a topic
+     * Lookup a topic.
+     *
+     * @param topic
+     * @return the broker URL that serves the topic
+     */
+    String lookupTopic(String topic) throws PulsarAdminException;
+
+    /**
+     * Lookup a topic asynchronously.
      *
      * @param topic
      * @return the broker URL that serves the topic
      */
-    public String lookupTopic(String topic) throws PulsarAdminException;
+    CompletableFuture<String> lookupTopicAsync(String topic);
 
     /**
-     * Get a bundle range of a topic
+     * Get a bundle range of a topic.
      *
      * @param topic
      * @return
      * @throws PulsarAdminException
      */
-    public String getBundleRange(String topic) throws PulsarAdminException;
+    String getBundleRange(String topic) throws PulsarAdminException;
+
+    /**
+     * Get a bundle range of a topic asynchronously.
+     *
+     * @param topic
+     * @return
+     */
+    CompletableFuture<String> getBundleRangeAsync(String topic);
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
index 8d665a2..4599055 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.admin.internal;
 
+import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 
 import org.apache.pulsar.client.admin.Lookup;
@@ -26,6 +27,11 @@ import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.TopicName;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 public class LookupImpl extends BaseResource implements Lookup {
 
     private final WebTarget v2lookup;
@@ -39,37 +45,77 @@ public class LookupImpl extends BaseResource implements Lookup {
 
     @Override
     public String lookupTopic(String topic) throws PulsarAdminException {
-        TopicName topicName = TopicName.get(topic);
-        String prefix = topicName.isV2() ? "/topic" : "/destination";
-        WebTarget target = v2lookup.path(prefix).path(topicName.getLookupName());
-
         try {
-            return doTopicLookup(target);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return lookupTopicAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
-    public String getBundleRange(String topic) throws PulsarAdminException {
+    public CompletableFuture<String> lookupTopicAsync(String topic) {
         TopicName topicName = TopicName.get(topic);
         String prefix = topicName.isV2() ? "/topic" : "/destination";
-        WebTarget target = v2lookup.path(prefix).path(topicName.getLookupName()).path("bundle");
+        WebTarget path = v2lookup.path(prefix).path(topicName.getLookupName());
+
+        final CompletableFuture<String> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<LookupData>() {
+                    @Override
+                    public void completed(LookupData lookupData) {
+                        if (useTls) {
+                            future.complete(lookupData.getBrokerUrlTls());
+                        } else {
+                            future.complete(lookupData.getBrokerUrl());
+                        }
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
 
+    @Override
+    public String getBundleRange(String topic) throws PulsarAdminException {
         try {
-            return request(target).get(String.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getBundleRangeAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
-    private String doTopicLookup(WebTarget lookupResource) throws PulsarAdminException {
-        LookupData lookupData = request(lookupResource).get(LookupData.class);
-        if (useTls) {
-            return lookupData.getBrokerUrlTls();
-        } else {
-            return lookupData.getBrokerUrl();
-        }
+    @Override
+    public CompletableFuture<String> getBundleRangeAsync(String topic) {
+        TopicName topicName = TopicName.get(topic);
+        String prefix = topicName.isV2() ? "/topic" : "/destination";
+        WebTarget path = v2lookup.path(prefix).path(topicName.getLookupName()).path("bundle");
+        final CompletableFuture<String> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<String>() {
+                    @Override
+                    public void completed(String bundleRange) {
+                        future.complete(bundleRange);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
     }
 
 }