You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/07/20 00:25:17 UTC

[pulsar] branch master updated: Process requests asynchronously on some REST APIs (1) (#4765)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 849b6c5  Process requests asynchronously on some REST APIs (1) (#4765)
849b6c5 is described below

commit 849b6c57c32435e3eb77b2e852159470ec27c6f2
Author: massakam <ma...@yahoo-corp.jp>
AuthorDate: Sat Jul 20 09:25:11 2019 +0900

    Process requests asynchronously on some REST APIs (1) (#4765)
---
 .../broker/admin/v1/NonPersistentTopics.java       | 67 +++++++++++++---------
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 18 +++---
 .../broker/admin/v2/NonPersistentTopics.java       | 62 +++++++++++---------
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 11 +++-
 .../org/apache/pulsar/broker/admin/AdminTest.java  |  6 +-
 5 files changed, 96 insertions(+), 68 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index c01938f..dd9fc66 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -37,6 +37,8 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response.Status;
 
@@ -160,20 +162,29 @@ public class NonPersistentTopics extends PersistentTopics {
     @ApiOperation(value = "Get the list of non-persistent topics under a namespace.", response = String.class, responseContainer = "List")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace doesn't exist") })
-    public List<String> getList(@PathParam("property") String property, @PathParam("cluster") String cluster,
-                                @PathParam("namespace") String namespace) {
+    public void getList(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
+            @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
         log.info("[{}] list of topics on namespace {}/{}/{}", clientAppId(), property, cluster, namespace);
-        validateAdminAccessForTenant(property);
-        Policies policies = getNamespacePolicies(property, cluster, namespace);
-        NamespaceName nsName = NamespaceName.get(property, cluster, namespace);
 
-        if (!cluster.equals(Constants.GLOBAL_CLUSTER)) {
-            validateClusterOwnership(cluster);
-            validateClusterForTenant(property, cluster);
-        } else {
-            // check cluster ownership for a given global namespace: redirect if peer-cluster owns it
-            validateGlobalNamespaceOwnership(nsName);
+        Policies policies = null;
+        NamespaceName nsName = null;
+        try {
+            validateAdminAccessForTenant(property);
+            policies = getNamespacePolicies(property, cluster, namespace);
+            nsName = NamespaceName.get(property, cluster, namespace);
+
+            if (!cluster.equals(Constants.GLOBAL_CLUSTER)) {
+                validateClusterOwnership(cluster);
+                validateClusterForTenant(property, cluster);
+            } else {
+                // check cluster ownership for a given global namespace: redirect if peer-cluster owns it
+                validateGlobalNamespaceOwnership(nsName);
+            }
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+            return;
         }
+
         final List<CompletableFuture<List<String>>> futures = Lists.newArrayList();
         final List<String> boundaries = policies.bundles.getBoundaries();
         for (int i = 0; i < boundaries.size() - 1; i++) {
@@ -182,30 +193,30 @@ public class NonPersistentTopics extends PersistentTopics {
                 futures.add(pulsar().getAdminClient().nonPersistentTopics().getListInBundleAsync(nsName.toString(),
                         bundle));
             } catch (PulsarServerException e) {
-                log.error(String.format("[%s] Failed to get list of topics under namespace %s/%s/%s/%s", clientAppId(),
-                        property, cluster, namespace, bundle), e);
-                throw new RestException(e);
+                log.error("[{}] Failed to get list of topics under namespace {}/{}/{}/{}", clientAppId(), property,
+                        cluster, namespace, bundle, e);
+                asyncResponse.resume(new RestException(e));
+                return;
             }
         }
+
         final List<String> topics = Lists.newArrayList();
-        try {
-            FutureUtil.waitForAll(futures).get();
-            futures.forEach(topicListFuture -> {
+        FutureUtil.waitForAll(futures).handle((result, exception) -> {
+            for (int i = 0; i < futures.size(); i++) {
                 try {
-                    if (topicListFuture.isDone() && topicListFuture.get() != null) {
-                        topics.addAll(topicListFuture.get());
+                    if (futures.get(i).isDone() && futures.get(i).get() != null) {
+                        topics.addAll(futures.get(i).get());
                     }
                 } catch (InterruptedException | ExecutionException e) {
-                    log.error(String.format("[%s] Failed to get list of topics under namespace %s/%s/%s", clientAppId(),
-                            property, cluster, namespace), e);
+                    log.error("[{}] Failed to get list of topics under namespace {}/{}/{}", clientAppId(), property,
+                            cluster, namespace, e);
+                    asyncResponse.resume(new RestException(e instanceof ExecutionException ? e.getCause() : e));
+                    return null;
                 }
-            });
-        } catch (InterruptedException | ExecutionException e) {
-            log.error(String.format("[%s] Failed to get list of topics under namespace %s/%s/%s", clientAppId(),
-                    property, cluster, namespace), e);
-            throw new RestException(e instanceof ExecutionException ? e.getCause() : e);
-        }
-        return topics;
+            }
+            asyncResponse.resume(topics);
+            return null;
+        });
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 3b83247..78efca2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -71,10 +71,14 @@ public class PersistentTopics extends PersistentTopicsBase {
     @ApiOperation(hidden = true, value = "Get the list of topics under a namespace.", response = String.class, responseContainer = "List")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace doesn't exist") })
-    public List<String> getList(@PathParam("property") String property, @PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace) {
-        validateNamespaceName(property, cluster, namespace);
-        return internalGetList();
+    public void getList(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
+            @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
+        try {
+            validateNamespaceName(property, cluster, namespace);
+            asyncResponse.resume(internalGetList());
+        } catch (Exception e) {
+            asyncResponse.resume(e instanceof RestException ? e : new RestException(e));
+        }
     }
 
     @GET
@@ -463,9 +467,9 @@ public class PersistentTopics extends PersistentTopicsBase {
                             @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"),
                             @ApiResponse(code = 404, message = "Topic does not exist"),
                             @ApiResponse(code = 409, message = "Compaction already running")})
-   public void compact(@PathParam("property") String property, @PathParam("cluster") String cluster,
-                       @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
-                       @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+    public void compact(@PathParam("property") String property, @PathParam("cluster") String cluster,
+                        @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
+                        @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
         internalTriggerCompaction(authoritative);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index f93d88f..32047b8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -38,6 +38,8 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response.Status;
 
@@ -221,21 +223,27 @@ public class NonPersistentTopics extends PersistentTopics {
             @ApiResponse(code = 500, message = "Internal server error"),
             @ApiResponse(code = 503, message = "Failed to validate global cluster configuration"),
     })
-    public List<String> getList(
+    public void getList(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
             @PathParam("namespace") String namespace) {
-        validateNamespaceName(tenant, namespace);
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] list of topics on namespace {}", clientAppId(), namespaceName);
-        }
-        validateAdminAccessForTenant(tenant);
-        Policies policies = getNamespacePolicies(namespaceName);
-
+        Policies policies = null;
+        try {
+            validateNamespaceName(tenant, namespace);
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] list of topics on namespace {}", clientAppId(), namespaceName);
+            }
+            validateAdminAccessForTenant(tenant);
+            policies = getNamespacePolicies(namespaceName);
 
-        // check cluster ownership for a given global namespace: redirect if peer-cluster owns it
-        validateGlobalNamespaceOwnership(namespaceName);
+            // check cluster ownership for a given global namespace: redirect if peer-cluster owns it
+            validateGlobalNamespaceOwnership(namespaceName);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+            return;
+        }
 
         final List<CompletableFuture<List<String>>> futures = Lists.newArrayList();
         final List<String> boundaries = policies.bundles.getBoundaries();
@@ -244,31 +252,29 @@ public class NonPersistentTopics extends PersistentTopics {
             try {
                 futures.add(pulsar().getAdminClient().topics().getListInBundleAsync(namespaceName.toString(), bundle));
             } catch (PulsarServerException e) {
-                log.error(String.format("[%s] Failed to get list of topics under namespace %s/%s", clientAppId(),
-                        namespaceName, bundle), e);
-                throw new RestException(e);
+                log.error("[{}] Failed to get list of topics under namespace {}/{}", clientAppId(), namespaceName,
+                        bundle, e);
+                asyncResponse.resume(new RestException(e));
+                return;
             }
         }
+
         final List<String> topics = Lists.newArrayList();
-        try {
-            FutureUtil.waitForAll(futures).get();
-            futures.forEach(topicListFuture -> {
+        FutureUtil.waitForAll(futures).handle((result, exception) -> {
+            for (int i = 0; i < futures.size(); i++) {
                 try {
-                    if (topicListFuture.isDone() && topicListFuture.get() != null) {
-                        topics.addAll(topicListFuture.get());
+                    if (futures.get(i).isDone() && futures.get(i).get() != null) {
+                        topics.addAll(futures.get(i).get());
                     }
                 } catch (InterruptedException | ExecutionException e) {
-                    log.error(String.format("[%s] Failed to get list of topics under namespace %s", clientAppId(),
-                            namespaceName), e);
+                    log.error("[{}] Failed to get list of topics under namespace {}", clientAppId(), namespaceName, e);
+                    asyncResponse.resume(new RestException(e instanceof ExecutionException ? e.getCause() : e));
+                    return null;
                 }
-            });
-        } catch (InterruptedException | ExecutionException e) {
-            log.error(
-                    String.format("[%s] Failed to get list of topics under namespace %s", clientAppId(), namespaceName),
-                    e);
-            throw new RestException(e instanceof ExecutionException ? e.getCause() : e);
-        }
-        return topics;
+            }
+            asyncResponse.resume(topics);
+            return null;
+        });
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 81802ce..fa2bc25 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -74,13 +74,18 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
             @ApiResponse(code = 412, message = "Namespace name is not valid"),
             @ApiResponse(code = 500, message = "Internal server error") })
-    public List<String> getList(
+    public void getList(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
             @PathParam("namespace") String namespace) {
-        validateNamespaceName(tenant, namespace);
-        return internalGetList();
+        try {
+            validateNamespaceName(tenant, namespace);
+            asyncResponse.resume(internalGetList());
+        } catch (Exception e) {
+            asyncResponse.resume(e instanceof RestException ? e : new RestException(e));
+        }
     }
 
     @GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index fc1c49c..643020d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -46,6 +46,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
 import javax.ws.rs.core.UriInfo;
@@ -631,8 +632,9 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
                 ObjectMapperFactory.getThreadLocal().writeValueAsBytes(new Policies()), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                 CreateMode.PERSISTENT);
 
-        List<String> list = persistentTopics.getList(property, cluster, namespace);
-        assertTrue(list.isEmpty());
+        AsyncResponse response = mock(AsyncResponse.class);
+        persistentTopics.getList(response, property, cluster, namespace);
+        verify(response, times(1)).resume(Lists.newArrayList());
         // create topic
         assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists.newArrayList());
         persistentTopics.createPartitionedTopic(property, cluster, namespace, topic, 5);