You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/03/21 03:28:17 UTC
[pulsar] branch master updated: [PulsarAdmin] Lookup to async
(#6569)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ddf1cb9 [PulsarAdmin] Lookup to async (#6569)
ddf1cb9 is described below
commit ddf1cb9d964916e8e4ab5ce7fb84c665228ce23d
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Sat Mar 21 11:28:00 2020 +0800
[PulsarAdmin] Lookup to async (#6569)
---
.../org/apache/pulsar/client/admin/Lookup.java | 26 +++++--
.../pulsar/client/admin/internal/LookupImpl.java | 84 +++++++++++++++++-----
2 files changed, 87 insertions(+), 23 deletions(-)
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Lookup.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Lookup.java
index 10d244b..f84ea88 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Lookup.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Lookup.java
@@ -18,25 +18,43 @@
*/
package org.apache.pulsar.client.admin;
+import java.util.concurrent.CompletableFuture;
+
/**
* This is an interface class to allow using command line tool to quickly lookup the broker serving the topic.
*/
public interface Lookup {
/**
- * Lookup a topic
+ * Lookup a topic.
+ *
+ * @param topic
+ * @return the broker URL that serves the topic
+ */
+ String lookupTopic(String topic) throws PulsarAdminException;
+
+ /**
+ * Lookup a topic asynchronously.
*
* @param topic
* @return the broker URL that serves the topic
*/
- public String lookupTopic(String topic) throws PulsarAdminException;
+ CompletableFuture<String> lookupTopicAsync(String topic);
/**
- * Get a bundle range of a topic
+ * Get a bundle range of a topic.
*
* @param topic
* @return
* @throws PulsarAdminException
*/
- public String getBundleRange(String topic) throws PulsarAdminException;
+ String getBundleRange(String topic) throws PulsarAdminException;
+
+ /**
+ * Get a bundle range of a topic asynchronously.
+ *
+ * @param topic
+ * @return
+ */
+ CompletableFuture<String> getBundleRangeAsync(String topic);
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
index 8d665a2..4599055 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.admin.internal;
+import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import org.apache.pulsar.client.admin.Lookup;
@@ -26,6 +27,11 @@ import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.TopicName;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
public class LookupImpl extends BaseResource implements Lookup {
private final WebTarget v2lookup;
@@ -39,37 +45,77 @@ public class LookupImpl extends BaseResource implements Lookup {
@Override
public String lookupTopic(String topic) throws PulsarAdminException {
- TopicName topicName = TopicName.get(topic);
- String prefix = topicName.isV2() ? "/topic" : "/destination";
- WebTarget target = v2lookup.path(prefix).path(topicName.getLookupName());
-
try {
- return doTopicLookup(target);
- } catch (Exception e) {
- throw getApiException(e);
+ return lookupTopicAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
- public String getBundleRange(String topic) throws PulsarAdminException {
+ public CompletableFuture<String> lookupTopicAsync(String topic) {
TopicName topicName = TopicName.get(topic);
String prefix = topicName.isV2() ? "/topic" : "/destination";
- WebTarget target = v2lookup.path(prefix).path(topicName.getLookupName()).path("bundle");
+ WebTarget path = v2lookup.path(prefix).path(topicName.getLookupName());
+
+ final CompletableFuture<String> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<LookupData>() {
+ @Override
+ public void completed(LookupData lookupData) {
+ if (useTls) {
+ future.complete(lookupData.getBrokerUrlTls());
+ } else {
+ future.complete(lookupData.getBrokerUrl());
+ }
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+ @Override
+ public String getBundleRange(String topic) throws PulsarAdminException {
try {
- return request(target).get(String.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getBundleRangeAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
- private String doTopicLookup(WebTarget lookupResource) throws PulsarAdminException {
- LookupData lookupData = request(lookupResource).get(LookupData.class);
- if (useTls) {
- return lookupData.getBrokerUrlTls();
- } else {
- return lookupData.getBrokerUrl();
- }
+ @Override
+ public CompletableFuture<String> getBundleRangeAsync(String topic) {
+ TopicName topicName = TopicName.get(topic);
+ String prefix = topicName.isV2() ? "/topic" : "/destination";
+ WebTarget path = v2lookup.path(prefix).path(topicName.getLookupName()).path("bundle");
+ final CompletableFuture<String> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<String>() {
+ @Override
+ public void completed(String bundleRange) {
+ future.complete(bundleRange);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
}
}