You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/07/20 00:25:17 UTC
[pulsar] branch master updated: Process requests asynchronously on
some REST APIs (1) (#4765)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 849b6c5 Process requests asynchronously on some REST APIs (1) (#4765)
849b6c5 is described below
commit 849b6c57c32435e3eb77b2e852159470ec27c6f2
Author: massakam <ma...@yahoo-corp.jp>
AuthorDate: Sat Jul 20 09:25:11 2019 +0900
Process requests asynchronously on some REST APIs (1) (#4765)
---
.../broker/admin/v1/NonPersistentTopics.java | 67 +++++++++++++---------
.../pulsar/broker/admin/v1/PersistentTopics.java | 18 +++---
.../broker/admin/v2/NonPersistentTopics.java | 62 +++++++++++---------
.../pulsar/broker/admin/v2/PersistentTopics.java | 11 +++-
.../org/apache/pulsar/broker/admin/AdminTest.java | 6 +-
5 files changed, 96 insertions(+), 68 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index c01938f..dd9fc66 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -37,6 +37,8 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.Status;
@@ -160,20 +162,29 @@ public class NonPersistentTopics extends PersistentTopics {
@ApiOperation(value = "Get the list of non-persistent topics under a namespace.", response = String.class, responseContainer = "List")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist") })
- public List<String> getList(@PathParam("property") String property, @PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace) {
+ public void getList(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
+ @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
log.info("[{}] list of topics on namespace {}/{}/{}", clientAppId(), property, cluster, namespace);
- validateAdminAccessForTenant(property);
- Policies policies = getNamespacePolicies(property, cluster, namespace);
- NamespaceName nsName = NamespaceName.get(property, cluster, namespace);
- if (!cluster.equals(Constants.GLOBAL_CLUSTER)) {
- validateClusterOwnership(cluster);
- validateClusterForTenant(property, cluster);
- } else {
- // check cluster ownership for a given global namespace: redirect if peer-cluster owns it
- validateGlobalNamespaceOwnership(nsName);
+ Policies policies = null;
+ NamespaceName nsName = null;
+ try {
+ validateAdminAccessForTenant(property);
+ policies = getNamespacePolicies(property, cluster, namespace);
+ nsName = NamespaceName.get(property, cluster, namespace);
+
+ if (!cluster.equals(Constants.GLOBAL_CLUSTER)) {
+ validateClusterOwnership(cluster);
+ validateClusterForTenant(property, cluster);
+ } else {
+ // check cluster ownership for a given global namespace: redirect if peer-cluster owns it
+ validateGlobalNamespaceOwnership(nsName);
+ }
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ return;
}
+
final List<CompletableFuture<List<String>>> futures = Lists.newArrayList();
final List<String> boundaries = policies.bundles.getBoundaries();
for (int i = 0; i < boundaries.size() - 1; i++) {
@@ -182,30 +193,30 @@ public class NonPersistentTopics extends PersistentTopics {
futures.add(pulsar().getAdminClient().nonPersistentTopics().getListInBundleAsync(nsName.toString(),
bundle));
} catch (PulsarServerException e) {
- log.error(String.format("[%s] Failed to get list of topics under namespace %s/%s/%s/%s", clientAppId(),
- property, cluster, namespace, bundle), e);
- throw new RestException(e);
+ log.error("[{}] Failed to get list of topics under namespace {}/{}/{}/{}", clientAppId(), property,
+ cluster, namespace, bundle, e);
+ asyncResponse.resume(new RestException(e));
+ return;
}
}
+
final List<String> topics = Lists.newArrayList();
- try {
- FutureUtil.waitForAll(futures).get();
- futures.forEach(topicListFuture -> {
+ FutureUtil.waitForAll(futures).handle((result, exception) -> {
+ for (int i = 0; i < futures.size(); i++) {
try {
- if (topicListFuture.isDone() && topicListFuture.get() != null) {
- topics.addAll(topicListFuture.get());
+ if (futures.get(i).isDone() && futures.get(i).get() != null) {
+ topics.addAll(futures.get(i).get());
}
} catch (InterruptedException | ExecutionException e) {
- log.error(String.format("[%s] Failed to get list of topics under namespace %s/%s/%s", clientAppId(),
- property, cluster, namespace), e);
+ log.error("[{}] Failed to get list of topics under namespace {}/{}/{}", clientAppId(), property,
+ cluster, namespace, e);
+ asyncResponse.resume(new RestException(e instanceof ExecutionException ? e.getCause() : e));
+ return null;
}
- });
- } catch (InterruptedException | ExecutionException e) {
- log.error(String.format("[%s] Failed to get list of topics under namespace %s/%s/%s", clientAppId(),
- property, cluster, namespace), e);
- throw new RestException(e instanceof ExecutionException ? e.getCause() : e);
- }
- return topics;
+ }
+ asyncResponse.resume(topics);
+ return null;
+ });
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 3b83247..78efca2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -71,10 +71,14 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiOperation(hidden = true, value = "Get the list of topics under a namespace.", response = String.class, responseContainer = "List")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist") })
- public List<String> getList(@PathParam("property") String property, @PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace) {
- validateNamespaceName(property, cluster, namespace);
- return internalGetList();
+ public void getList(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
+ @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
+ try {
+ validateNamespaceName(property, cluster, namespace);
+ asyncResponse.resume(internalGetList());
+ } catch (Exception e) {
+ asyncResponse.resume(e instanceof RestException ? e : new RestException(e));
+ }
}
@GET
@@ -463,9 +467,9 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 405, message = "Operation not allowed on persistent topic"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 409, message = "Compaction already running")})
- public void compact(@PathParam("property") String property, @PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+ public void compact(@PathParam("property") String property, @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
internalTriggerCompaction(authoritative);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index f93d88f..32047b8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -38,6 +38,8 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.Status;
@@ -221,21 +223,27 @@ public class NonPersistentTopics extends PersistentTopics {
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration"),
})
- public List<String> getList(
+ public void getList(
+ @Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace) {
- validateNamespaceName(tenant, namespace);
- if (log.isDebugEnabled()) {
- log.debug("[{}] list of topics on namespace {}", clientAppId(), namespaceName);
- }
- validateAdminAccessForTenant(tenant);
- Policies policies = getNamespacePolicies(namespaceName);
-
+ Policies policies = null;
+ try {
+ validateNamespaceName(tenant, namespace);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] list of topics on namespace {}", clientAppId(), namespaceName);
+ }
+ validateAdminAccessForTenant(tenant);
+ policies = getNamespacePolicies(namespaceName);
- // check cluster ownership for a given global namespace: redirect if peer-cluster owns it
- validateGlobalNamespaceOwnership(namespaceName);
+ // check cluster ownership for a given global namespace: redirect if peer-cluster owns it
+ validateGlobalNamespaceOwnership(namespaceName);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
final List<CompletableFuture<List<String>>> futures = Lists.newArrayList();
final List<String> boundaries = policies.bundles.getBoundaries();
@@ -244,31 +252,29 @@ public class NonPersistentTopics extends PersistentTopics {
try {
futures.add(pulsar().getAdminClient().topics().getListInBundleAsync(namespaceName.toString(), bundle));
} catch (PulsarServerException e) {
- log.error(String.format("[%s] Failed to get list of topics under namespace %s/%s", clientAppId(),
- namespaceName, bundle), e);
- throw new RestException(e);
+ log.error("[{}] Failed to get list of topics under namespace {}/{}", clientAppId(), namespaceName,
+ bundle, e);
+ asyncResponse.resume(new RestException(e));
+ return;
}
}
+
final List<String> topics = Lists.newArrayList();
- try {
- FutureUtil.waitForAll(futures).get();
- futures.forEach(topicListFuture -> {
+ FutureUtil.waitForAll(futures).handle((result, exception) -> {
+ for (int i = 0; i < futures.size(); i++) {
try {
- if (topicListFuture.isDone() && topicListFuture.get() != null) {
- topics.addAll(topicListFuture.get());
+ if (futures.get(i).isDone() && futures.get(i).get() != null) {
+ topics.addAll(futures.get(i).get());
}
} catch (InterruptedException | ExecutionException e) {
- log.error(String.format("[%s] Failed to get list of topics under namespace %s", clientAppId(),
- namespaceName), e);
+ log.error("[{}] Failed to get list of topics under namespace {}", clientAppId(), namespaceName, e);
+ asyncResponse.resume(new RestException(e instanceof ExecutionException ? e.getCause() : e));
+ return null;
}
- });
- } catch (InterruptedException | ExecutionException e) {
- log.error(
- String.format("[%s] Failed to get list of topics under namespace %s", clientAppId(), namespaceName),
- e);
- throw new RestException(e instanceof ExecutionException ? e.getCause() : e);
- }
- return topics;
+ }
+ asyncResponse.resume(topics);
+ return null;
+ });
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 81802ce..fa2bc25 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -74,13 +74,18 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
@ApiResponse(code = 412, message = "Namespace name is not valid"),
@ApiResponse(code = 500, message = "Internal server error") })
- public List<String> getList(
+ public void getList(
+ @Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace) {
- validateNamespaceName(tenant, namespace);
- return internalGetList();
+ try {
+ validateNamespaceName(tenant, namespace);
+ asyncResponse.resume(internalGetList());
+ } catch (Exception e) {
+ asyncResponse.resume(e instanceof RestException ? e : new RestException(e));
+ }
}
@GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index fc1c49c..643020d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -46,6 +46,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;
@@ -631,8 +632,9 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(new Policies()), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
- List<String> list = persistentTopics.getList(property, cluster, namespace);
- assertTrue(list.isEmpty());
+ AsyncResponse response = mock(AsyncResponse.class);
+ persistentTopics.getList(response, property, cluster, namespace);
+ verify(response, times(1)).resume(Lists.newArrayList());
// create topic
assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists.newArrayList());
persistentTopics.createPartitionedTopic(property, cluster, namespace, topic, 5);