You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/03/29 19:09:03 UTC

[GitHub] merlimat closed pull request #1463: Provide v2 support for admin non persistent topics.

merlimat closed pull request #1463: Provide v2 support for  admin non persistent topics.
URL: https://github.com/apache/incubator-pulsar/pull/1463
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
index d71a4de36..ccd712a72 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
@@ -27,7 +27,6 @@
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.MediaType;
 
 import org.apache.pulsar.client.admin.NonPersistentTopics;
@@ -42,10 +41,12 @@
 public class NonPersistentTopicsImpl extends BaseResource implements NonPersistentTopics {
 
     private final WebTarget adminNonPersistentTopics;
+    private final WebTarget adminV2NonPersistentTopics;
 
     public NonPersistentTopicsImpl(WebTarget web, Authentication auth) {
         super(auth);
         adminNonPersistentTopics = web.path("/admin/non-persistent");
+        adminV2NonPersistentTopics = web.path("/admin/v2/non-persistent");
     }
 
     @Override
@@ -63,10 +64,9 @@ public void createPartitionedTopic(String topic, int numPartitions) throws Pulsa
     @Override
     public CompletableFuture<Void> createPartitionedTopicAsync(String topic, int numPartitions) {
         checkArgument(numPartitions > 1, "Number of partitions should be more than 1");
-        TopicName ds = validateTopic(topic);
-        return asyncPutRequest(
-                adminNonPersistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("partitions"),
-                Entity.entity(numPartitions, MediaType.APPLICATION_JSON));
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "partitions");
+        return asyncPutRequest(path, Entity.entity(numPartitions, MediaType.APPLICATION_JSON));
     }
 
     @Override
@@ -83,9 +83,10 @@ public PartitionedTopicMetadata getPartitionedTopicMetadata(String topic) throws
 
     @Override
     public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(String topic) {
-        TopicName ds = validateTopic(topic);
+        TopicName topicName = validateTopic(topic);
         final CompletableFuture<PartitionedTopicMetadata> future = new CompletableFuture<>();
-        asyncGetRequest(adminNonPersistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("partitions"),
+        WebTarget path = topicPath(topicName, "partitions");
+        asyncGetRequest(path,
                 new InvocationCallback<PartitionedTopicMetadata>() {
 
                     @Override
@@ -115,9 +116,10 @@ public NonPersistentTopicStats getStats(String topic) throws PulsarAdminExceptio
 
     @Override
     public CompletableFuture<NonPersistentTopicStats> getStatsAsync(String topic) {
-        TopicName ds = validateTopic(topic);
+        TopicName topicName = validateTopic(topic);
         final CompletableFuture<NonPersistentTopicStats> future = new CompletableFuture<>();
-        asyncGetRequest(adminNonPersistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("stats"),
+        WebTarget path = topicPath(topicName, "stats");
+        asyncGetRequest(path,
                 new InvocationCallback<NonPersistentTopicStats>() {
 
                     @Override
@@ -147,9 +149,10 @@ public PersistentTopicInternalStats getInternalStats(String topic) throws Pulsar
 
     @Override
     public CompletableFuture<PersistentTopicInternalStats> getInternalStatsAsync(String topic) {
-        TopicName ds = validateTopic(topic);
+        TopicName topicName = validateTopic(topic);
         final CompletableFuture<PersistentTopicInternalStats> future = new CompletableFuture<>();
-        asyncGetRequest(adminNonPersistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("internalStats"),
+        WebTarget path = topicPath(topicName, "internalStats");
+        asyncGetRequest(path,
                 new InvocationCallback<PersistentTopicInternalStats>() {
 
                     @Override
@@ -179,9 +182,9 @@ public void unload(String topic) throws PulsarAdminException {
 
     @Override
     public CompletableFuture<Void> unloadAsync(String topic) {
-        TopicName ds = validateTopic(topic);
-        return asyncPutRequest(adminNonPersistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("unload"),
-                Entity.entity("", MediaType.APPLICATION_JSON));
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "unload");
+        return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
     }
 
     @Override
@@ -200,8 +203,9 @@ public void unload(String topic) throws PulsarAdminException {
     public CompletableFuture<List<String>> getListInBundleAsync(String namespace, String bundleRange) {
         NamespaceName ns = NamespaceName.get(namespace);
         final CompletableFuture<List<String>> future = new CompletableFuture<>();
-        asyncGetRequest(adminNonPersistentTopics.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName())
-                .path(bundleRange), new InvocationCallback<List<String>>() {
+        WebTarget path = namespacePath(ns, bundleRange);
+        asyncGetRequest(path,
+                new InvocationCallback<List<String>>() {
                     @Override
                     public void completed(List<String> response) {
                         future.complete(response);
@@ -230,7 +234,8 @@ public void failed(Throwable throwable) {
     public CompletableFuture<List<String>> getListAsync(String namespace) {
         NamespaceName ns = NamespaceName.get(namespace);
         final CompletableFuture<List<String>> future = new CompletableFuture<>();
-        asyncGetRequest(adminNonPersistentTopics.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()),
+        WebTarget path = namespacePath(ns);
+        asyncGetRequest(path,
                 new InvocationCallback<List<String>>() {
                     @Override
                     public void completed(List<String> response) {
@@ -253,4 +258,17 @@ private TopicName validateTopic(String topic) {
         return TopicName.get(topic);
     }
 
+    private WebTarget namespacePath(NamespaceName namespace, String... parts) {
+        final WebTarget base = namespace.isV2() ? adminV2NonPersistentTopics : adminNonPersistentTopics;
+        WebTarget namespacePath = base.path(namespace.toString());
+        namespacePath = WebTargets.addParts(namespacePath, parts);
+        return namespacePath;
+    }
+
+    private WebTarget topicPath(TopicName topic, String... parts) {
+        final WebTarget base = topic.isV2() ? adminV2NonPersistentTopics : adminNonPersistentTopics;
+        WebTarget topicPath = base.path(topic.getRestPath());
+        topicPath = WebTargets.addParts(topicPath, parts);
+        return topicPath;
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services