You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/10/24 00:51:11 UTC

[pulsar] branch master updated: Support admin-api for subscription-name with wildcard char (#2809)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c1dc39b  Support admin-api for subscription-name with wildcard char (#2809)
c1dc39b is described below

commit c1dc39b628fe5c7f2f4e77c261c86be45122176d
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Oct 23 17:51:06 2018 -0700

    Support admin-api for subscription-name with wildcard char (#2809)
---
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 33 +++++++++++-----------
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 33 +++++++++++-----------
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 29 ++++++++++---------
 .../apache/pulsar/broker/admin/AdminApiTest2.java  |  4 +--
 4 files changed, 51 insertions(+), 48 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index b5fd76c..c453622 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import static org.apache.pulsar.common.util.Codec.decode;
 
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
@@ -285,10 +286,10 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 412, message = "Subscription has active consumers") })
     public void deleteSubscription(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
-            @PathParam("subName") String subName,
+            @PathParam("subName") String encodedSubName,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalDeleteSubscription(subName, authoritative);
+        internalDeleteSubscription(decode(encodedSubName), authoritative);
     }
 
     @POST
@@ -299,10 +300,10 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 404, message = "Topic or subscription does not exist") })
     public void skipAllMessages(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
-            @PathParam("subName") String subName,
+            @PathParam("subName") String encodedSubName,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalSkipAllMessages(subName, authoritative);
+        internalSkipAllMessages(decode(encodedSubName), authoritative);
     }
 
     @POST
@@ -312,10 +313,10 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 404, message = "Topic or subscription does not exist") })
     public void skipMessages(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
-            @PathParam("subName") String subName, @PathParam("numMessages") int numMessages,
+            @PathParam("subName") String encodedSubName, @PathParam("numMessages") int numMessages,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalSkipMessages(subName, numMessages, authoritative);
+        internalSkipMessages(decode(encodedSubName), numMessages, authoritative);
     }
 
     @POST
@@ -325,10 +326,10 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 404, message = "Topic or subscription does not exist") })
     public void expireTopicMessages(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
-            @PathParam("subName") String subName, @PathParam("expireTimeInSeconds") int expireTimeInSeconds,
+            @PathParam("subName") String encodedSubName, @PathParam("expireTimeInSeconds") int expireTimeInSeconds,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalExpireMessages(subName, expireTimeInSeconds, authoritative);
+        internalExpireMessages(decode(encodedSubName), expireTimeInSeconds, authoritative);
     }
 
     @POST
@@ -352,10 +353,10 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 404, message = "Topic/Subscription does not exist") })
     public void resetCursor(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
-            @PathParam("subName") String subName, @PathParam("timestamp") long timestamp,
+            @PathParam("subName") String encodedSubName, @PathParam("timestamp") long timestamp,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalResetCursor(subName, timestamp, authoritative);
+        internalResetCursor(decode(encodedSubName), timestamp, authoritative);
     }
 
     @POST
@@ -366,10 +367,10 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 405, message = "Not supported for partitioned topics") })
     public void resetCursorOnPosition(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
-            @PathParam("subName") String subName,
+            @PathParam("subName") String encodedSubName,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalResetCursorOnPosition(subName, authoritative, messageId);
+        internalResetCursorOnPosition(decode(encodedSubName), authoritative, messageId);
     }
 
     @PUT
@@ -380,10 +381,10 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 405, message = "Not supported for partitioned topics") })
     public void createSubscription(@PathParam("property") String property, @PathParam("cluster") String cluster,
            @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String topic,
-           @PathParam("subscriptionName") String subscriptionName,
+           @PathParam("subscriptionName") String encodedSubName,
            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
         validateTopicName(property, cluster, namespace, topic);
-        internalCreateSubscription(subscriptionName, messageId, authoritative);
+        internalCreateSubscription(decode(encodedSubName), messageId, authoritative);
     }
 
     @GET
@@ -393,10 +394,10 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 404, message = "Topic, subscription or the message position does not exist") })
     public Response peekNthMessage(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
-            @PathParam("subName") String subName, @PathParam("messagePosition") int messagePosition,
+            @PathParam("subName") String encodedSubName, @PathParam("messagePosition") int messagePosition,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        return internalPeekNthMessage(subName, messagePosition, authoritative);
+        return internalPeekNthMessage(decode(encodedSubName), messagePosition, authoritative);
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 771a057..0737026 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -53,6 +53,7 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
+import static org.apache.pulsar.common.util.Codec.decode;
 
 /**
  */
@@ -291,10 +292,10 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 404, message = "Topic does not exist"),
             @ApiResponse(code = 412, message = "Subscription has active consumers") })
     public void deleteSubscription(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String subName,
+            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        internalDeleteSubscription(subName, authoritative);
+        internalDeleteSubscription(decode(encodedSubName), authoritative);
     }
 
     @POST
@@ -304,10 +305,10 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 405, message = "Operation not allowed on non-persistent topic"),
             @ApiResponse(code = 404, message = "Topic or subscription does not exist") })
     public void skipAllMessages(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String subName,
+            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        internalSkipAllMessages(subName, authoritative);
+        internalSkipAllMessages(decode(encodedSubName), authoritative);
     }
 
     @POST
@@ -316,11 +317,11 @@ public class PersistentTopics extends PersistentTopicsBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Topic or subscription does not exist") })
     public void skipMessages(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String subName,
+            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
             @PathParam("numMessages") int numMessages,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        internalSkipMessages(subName, numMessages, authoritative);
+        internalSkipMessages(decode(encodedSubName), numMessages, authoritative);
     }
 
     @POST
@@ -329,11 +330,11 @@ public class PersistentTopics extends PersistentTopicsBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Topic or subscription does not exist") })
     public void expireTopicMessages(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String subName,
+            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
             @PathParam("expireTimeInSeconds") int expireTimeInSeconds,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        internalExpireMessages(subName, expireTimeInSeconds, authoritative);
+        internalExpireMessages(decode(encodedSubName), expireTimeInSeconds, authoritative);
     }
 
     @POST
@@ -356,10 +357,10 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
             @ApiResponse(code = 405, message = "Not supported for partitioned topics") })
     public void createSubscription(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String topic, @PathParam("subscriptionName") String subscriptionName,
+            @PathParam("topic") @Encoded String topic, @PathParam("subscriptionName") String encodedSubName,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
         validateTopicName(tenant, namespace, topic);
-        internalCreateSubscription(subscriptionName, messageId, authoritative);
+        internalCreateSubscription(decode(encodedSubName), messageId, authoritative);
     }
 
     @POST
@@ -368,11 +369,11 @@ public class PersistentTopics extends PersistentTopicsBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Topic/Subscription does not exist") })
     public void resetCursor(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String subName,
+            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
             @PathParam("timestamp") long timestamp,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        internalResetCursor(subName, timestamp, authoritative);
+        internalResetCursor(decode(encodedSubName), timestamp, authoritative);
     }
 
     @POST
@@ -382,10 +383,10 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
             @ApiResponse(code = 405, message = "Not supported for partitioned topics") })
     public void resetCursorOnPosition(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String subName,
+            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
         validateTopicName(tenant, namespace, encodedTopic);
-        internalResetCursorOnPosition(subName, authoritative, messageId);
+        internalResetCursorOnPosition(decode(encodedSubName), authoritative, messageId);
     }
 
     @GET
@@ -394,11 +395,11 @@ public class PersistentTopics extends PersistentTopicsBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Topic, subscription or the message position does not exist") })
     public Response peekNthMessage(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String subName,
+            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
             @PathParam("messagePosition") int messagePosition,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        return internalPeekNthMessage(subName, messagePosition, authoritative);
+        return internalPeekNthMessage(decode(encodedSubName), messagePosition, authoritative);
     }
 
     @GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 5c18dfe..6b17a73 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -696,6 +696,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
 
     @Test(dataProvider = "topicName")
     public void persistentTopics(String topicName) throws Exception {
+        final String subName = topicName;
         assertEquals(admin.topics().getList("prop-xyz/ns1"), Lists.newArrayList());
 
         final String persistentTopicName = "persistent://prop-xyz/ns1/" + topicName;
@@ -708,48 +709,48 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         URL pulsarUrl = new URL("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT);
         PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
                 .build();
-        Consumer<byte[]> consumer = client.newConsumer().topic(persistentTopicName).subscriptionName("my-sub")
+        Consumer<byte[]> consumer = client.newConsumer().topic(persistentTopicName).subscriptionName(subName)
                 .subscriptionType(SubscriptionType.Exclusive).subscribe();
 
-        assertEquals(admin.topics().getSubscriptions(persistentTopicName), Lists.newArrayList("my-sub"));
+        assertEquals(admin.topics().getSubscriptions(persistentTopicName), Lists.newArrayList(subName));
 
         publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + topicName, 10);
 
         TopicStats topicStats = admin.topics().getStats(persistentTopicName);
-        assertEquals(topicStats.subscriptions.keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub")));
-        assertEquals(topicStats.subscriptions.get("my-sub").consumers.size(), 1);
-        assertEquals(topicStats.subscriptions.get("my-sub").msgBacklog, 10);
+        assertEquals(topicStats.subscriptions.keySet(), Sets.newTreeSet(Lists.newArrayList(subName)));
+        assertEquals(topicStats.subscriptions.get(subName).consumers.size(), 1);
+        assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 10);
         assertEquals(topicStats.publishers.size(), 0);
 
         PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(persistentTopicName);
-        assertEquals(internalStats.cursors.keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub")));
+        assertEquals(internalStats.cursors.keySet(), Sets.newTreeSet(Lists.newArrayList(Codec.encode(subName))));
 
-        List<Message<byte[]>> messages = admin.topics().peekMessages(persistentTopicName, "my-sub", 3);
+        List<Message<byte[]>> messages = admin.topics().peekMessages(persistentTopicName, subName, 3);
         assertEquals(messages.size(), 3);
         for (int i = 0; i < 3; i++) {
             String expectedMessage = "message-" + i;
             assertEquals(messages.get(i).getData(), expectedMessage.getBytes());
         }
 
-        messages = admin.topics().peekMessages(persistentTopicName, "my-sub", 15);
+        messages = admin.topics().peekMessages(persistentTopicName, subName, 15);
         assertEquals(messages.size(), 10);
         for (int i = 0; i < 10; i++) {
             String expectedMessage = "message-" + i;
             assertEquals(messages.get(i).getData(), expectedMessage.getBytes());
         }
 
-        admin.topics().skipMessages(persistentTopicName, "my-sub", 5);
+        admin.topics().skipMessages(persistentTopicName, subName, 5);
         topicStats = admin.topics().getStats(persistentTopicName);
-        assertEquals(topicStats.subscriptions.get("my-sub").msgBacklog, 5);
+        assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 5);
 
-        admin.topics().skipAllMessages(persistentTopicName, "my-sub");
+        admin.topics().skipAllMessages(persistentTopicName, subName);
         topicStats = admin.topics().getStats(persistentTopicName);
-        assertEquals(topicStats.subscriptions.get("my-sub").msgBacklog, 0);
+        assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 0);
 
         consumer.close();
         client.close();
 
-        admin.topics().deleteSubscription(persistentTopicName, "my-sub");
+        admin.topics().deleteSubscription(persistentTopicName, subName);
 
         assertEquals(admin.topics().getSubscriptions(persistentTopicName), Lists.newArrayList());
         topicStats = admin.topics().getStats(persistentTopicName);
@@ -757,7 +758,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         assertEquals(topicStats.publishers.size(), 0);
 
         try {
-            admin.topics().skipAllMessages(persistentTopicName, "my-sub");
+            admin.topics().skipAllMessages(persistentTopicName, subName);
         } catch (NotFoundException e) {
         }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 8df9348..0b04ba3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -138,8 +138,8 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
     @Test
     public void testIncrementPartitionsOfTopic() throws Exception {
         final String topicName = "increment-partitionedTopic";
-        final String subName1 = topicName + "-my-sub-1";
-        final String subName2 = topicName + "-my-sub-2";
+        final String subName1 = topicName + "-my-sub-1/encode";
+        final String subName2 = topicName + "-my-sub-2/encode";
         final int startPartitions = 4;
         final int newPartitions = 8;
         final String partitionedTopicName = "persistent://prop-xyz/ns1/" + topicName;