You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/24 00:51:08 UTC

[GitHub] merlimat closed pull request #2809: Support admin-api for subscription-name with wildcard char

merlimat closed pull request #2809: Support admin-api for subscription-name with wildcard char
URL: https://github.com/apache/pulsar/pull/2809
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index b5fd76cc86..c453622536 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -48,6 +48,7 @@
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import static org.apache.pulsar.common.util.Codec.decode;
 
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
@@ -285,10 +286,10 @@ public PartitionedTopicStats getPartitionedStats(@PathParam("property") String p
             @ApiResponse(code = 412, message = "Subscription has active consumers") })
     public void deleteSubscription(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
-            @PathParam("subName") String subName,
+            @PathParam("subName") String encodedSubName,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalDeleteSubscription(subName, authoritative);
+        internalDeleteSubscription(decode(encodedSubName), authoritative);
     }
 
     @POST
@@ -299,10 +300,10 @@ public void deleteSubscription(@PathParam("property") String property, @PathPara
             @ApiResponse(code = 404, message = "Topic or subscription does not exist") })
     public void skipAllMessages(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
-            @PathParam("subName") String subName,
+            @PathParam("subName") String encodedSubName,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalSkipAllMessages(subName, authoritative);
+        internalSkipAllMessages(decode(encodedSubName), authoritative);
     }
 
     @POST
@@ -312,10 +313,10 @@ public void skipAllMessages(@PathParam("property") String property, @PathParam("
             @ApiResponse(code = 404, message = "Topic or subscription does not exist") })
     public void skipMessages(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
-            @PathParam("subName") String subName, @PathParam("numMessages") int numMessages,
+            @PathParam("subName") String encodedSubName, @PathParam("numMessages") int numMessages,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalSkipMessages(subName, numMessages, authoritative);
+        internalSkipMessages(decode(encodedSubName), numMessages, authoritative);
     }
 
     @POST
@@ -325,10 +326,10 @@ public void skipMessages(@PathParam("property") String property, @PathParam("clu
             @ApiResponse(code = 404, message = "Topic or subscription does not exist") })
     public void expireTopicMessages(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
-            @PathParam("subName") String subName, @PathParam("expireTimeInSeconds") int expireTimeInSeconds,
+            @PathParam("subName") String encodedSubName, @PathParam("expireTimeInSeconds") int expireTimeInSeconds,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalExpireMessages(subName, expireTimeInSeconds, authoritative);
+        internalExpireMessages(decode(encodedSubName), expireTimeInSeconds, authoritative);
     }
 
     @POST
@@ -352,10 +353,10 @@ public void expireMessagesForAllSubscriptions(@PathParam("property") String prop
             @ApiResponse(code = 404, message = "Topic/Subscription does not exist") })
     public void resetCursor(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
-            @PathParam("subName") String subName, @PathParam("timestamp") long timestamp,
+            @PathParam("subName") String encodedSubName, @PathParam("timestamp") long timestamp,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalResetCursor(subName, timestamp, authoritative);
+        internalResetCursor(decode(encodedSubName), timestamp, authoritative);
     }
 
     @POST
@@ -366,10 +367,10 @@ public void resetCursor(@PathParam("property") String property, @PathParam("clus
             @ApiResponse(code = 405, message = "Not supported for partitioned topics") })
     public void resetCursorOnPosition(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
-            @PathParam("subName") String subName,
+            @PathParam("subName") String encodedSubName,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalResetCursorOnPosition(subName, authoritative, messageId);
+        internalResetCursorOnPosition(decode(encodedSubName), authoritative, messageId);
     }
 
     @PUT
@@ -380,10 +381,10 @@ public void resetCursorOnPosition(@PathParam("property") String property, @PathP
             @ApiResponse(code = 405, message = "Not supported for partitioned topics") })
     public void createSubscription(@PathParam("property") String property, @PathParam("cluster") String cluster,
            @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String topic,
-           @PathParam("subscriptionName") String subscriptionName,
+           @PathParam("subscriptionName") String encodedSubName,
            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
         validateTopicName(property, cluster, namespace, topic);
-        internalCreateSubscription(subscriptionName, messageId, authoritative);
+        internalCreateSubscription(decode(encodedSubName), messageId, authoritative);
     }
 
     @GET
@@ -393,10 +394,10 @@ public void createSubscription(@PathParam("property") String property, @PathPara
             @ApiResponse(code = 404, message = "Topic, subscription or the message position does not exist") })
     public Response peekNthMessage(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
-            @PathParam("subName") String subName, @PathParam("messagePosition") int messagePosition,
+            @PathParam("subName") String encodedSubName, @PathParam("messagePosition") int messagePosition,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        return internalPeekNthMessage(subName, messagePosition, authoritative);
+        return internalPeekNthMessage(decode(encodedSubName), messagePosition, authoritative);
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 771a057e16..0737026c0e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -53,6 +53,7 @@
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
+import static org.apache.pulsar.common.util.Codec.decode;
 
 /**
  */
@@ -291,10 +292,10 @@ public PartitionedTopicStats getPartitionedStats(@PathParam("tenant") String ten
             @ApiResponse(code = 404, message = "Topic does not exist"),
             @ApiResponse(code = 412, message = "Subscription has active consumers") })
     public void deleteSubscription(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String subName,
+            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        internalDeleteSubscription(subName, authoritative);
+        internalDeleteSubscription(decode(encodedSubName), authoritative);
     }
 
     @POST
@@ -304,10 +305,10 @@ public void deleteSubscription(@PathParam("tenant") String tenant, @PathParam("n
             @ApiResponse(code = 405, message = "Operation not allowed on non-persistent topic"),
             @ApiResponse(code = 404, message = "Topic or subscription does not exist") })
     public void skipAllMessages(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String subName,
+            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        internalSkipAllMessages(subName, authoritative);
+        internalSkipAllMessages(decode(encodedSubName), authoritative);
     }
 
     @POST
@@ -316,11 +317,11 @@ public void skipAllMessages(@PathParam("tenant") String tenant, @PathParam("name
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Topic or subscription does not exist") })
     public void skipMessages(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String subName,
+            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
             @PathParam("numMessages") int numMessages,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        internalSkipMessages(subName, numMessages, authoritative);
+        internalSkipMessages(decode(encodedSubName), numMessages, authoritative);
     }
 
     @POST
@@ -329,11 +330,11 @@ public void skipMessages(@PathParam("tenant") String tenant, @PathParam("namespa
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Topic or subscription does not exist") })
     public void expireTopicMessages(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String subName,
+            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
             @PathParam("expireTimeInSeconds") int expireTimeInSeconds,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        internalExpireMessages(subName, expireTimeInSeconds, authoritative);
+        internalExpireMessages(decode(encodedSubName), expireTimeInSeconds, authoritative);
     }
 
     @POST
@@ -356,10 +357,10 @@ public void expireMessagesForAllSubscriptions(@PathParam("tenant") String tenant
             @ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
             @ApiResponse(code = 405, message = "Not supported for partitioned topics") })
     public void createSubscription(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String topic, @PathParam("subscriptionName") String subscriptionName,
+            @PathParam("topic") @Encoded String topic, @PathParam("subscriptionName") String encodedSubName,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
         validateTopicName(tenant, namespace, topic);
-        internalCreateSubscription(subscriptionName, messageId, authoritative);
+        internalCreateSubscription(decode(encodedSubName), messageId, authoritative);
     }
 
     @POST
@@ -368,11 +369,11 @@ public void createSubscription(@PathParam("tenant") String tenant, @PathParam("n
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Topic/Subscription does not exist") })
     public void resetCursor(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String subName,
+            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
             @PathParam("timestamp") long timestamp,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        internalResetCursor(subName, timestamp, authoritative);
+        internalResetCursor(decode(encodedSubName), timestamp, authoritative);
     }
 
     @POST
@@ -382,10 +383,10 @@ public void resetCursor(@PathParam("tenant") String tenant, @PathParam("namespac
             @ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
             @ApiResponse(code = 405, message = "Not supported for partitioned topics") })
     public void resetCursorOnPosition(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String subName,
+            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
         validateTopicName(tenant, namespace, encodedTopic);
-        internalResetCursorOnPosition(subName, authoritative, messageId);
+        internalResetCursorOnPosition(decode(encodedSubName), authoritative, messageId);
     }
 
     @GET
@@ -394,11 +395,11 @@ public void resetCursorOnPosition(@PathParam("tenant") String tenant, @PathParam
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Topic, subscription or the message position does not exist") })
     public Response peekNthMessage(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String subName,
+            @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
             @PathParam("messagePosition") int messagePosition,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        return internalPeekNthMessage(subName, messagePosition, authoritative);
+        return internalPeekNthMessage(decode(encodedSubName), messagePosition, authoritative);
     }
 
     @GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 5c18dfe2fb..6b17a7320f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -696,6 +696,7 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc
 
     @Test(dataProvider = "topicName")
     public void persistentTopics(String topicName) throws Exception {
+        final String subName = topicName;
         assertEquals(admin.topics().getList("prop-xyz/ns1"), Lists.newArrayList());
 
         final String persistentTopicName = "persistent://prop-xyz/ns1/" + topicName;
@@ -708,48 +709,48 @@ public void persistentTopics(String topicName) throws Exception {
         URL pulsarUrl = new URL("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT);
         PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
                 .build();
-        Consumer<byte[]> consumer = client.newConsumer().topic(persistentTopicName).subscriptionName("my-sub")
+        Consumer<byte[]> consumer = client.newConsumer().topic(persistentTopicName).subscriptionName(subName)
                 .subscriptionType(SubscriptionType.Exclusive).subscribe();
 
-        assertEquals(admin.topics().getSubscriptions(persistentTopicName), Lists.newArrayList("my-sub"));
+        assertEquals(admin.topics().getSubscriptions(persistentTopicName), Lists.newArrayList(subName));
 
         publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + topicName, 10);
 
         TopicStats topicStats = admin.topics().getStats(persistentTopicName);
-        assertEquals(topicStats.subscriptions.keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub")));
-        assertEquals(topicStats.subscriptions.get("my-sub").consumers.size(), 1);
-        assertEquals(topicStats.subscriptions.get("my-sub").msgBacklog, 10);
+        assertEquals(topicStats.subscriptions.keySet(), Sets.newTreeSet(Lists.newArrayList(subName)));
+        assertEquals(topicStats.subscriptions.get(subName).consumers.size(), 1);
+        assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 10);
         assertEquals(topicStats.publishers.size(), 0);
 
         PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(persistentTopicName);
-        assertEquals(internalStats.cursors.keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub")));
+        assertEquals(internalStats.cursors.keySet(), Sets.newTreeSet(Lists.newArrayList(Codec.encode(subName))));
 
-        List<Message<byte[]>> messages = admin.topics().peekMessages(persistentTopicName, "my-sub", 3);
+        List<Message<byte[]>> messages = admin.topics().peekMessages(persistentTopicName, subName, 3);
         assertEquals(messages.size(), 3);
         for (int i = 0; i < 3; i++) {
             String expectedMessage = "message-" + i;
             assertEquals(messages.get(i).getData(), expectedMessage.getBytes());
         }
 
-        messages = admin.topics().peekMessages(persistentTopicName, "my-sub", 15);
+        messages = admin.topics().peekMessages(persistentTopicName, subName, 15);
         assertEquals(messages.size(), 10);
         for (int i = 0; i < 10; i++) {
             String expectedMessage = "message-" + i;
             assertEquals(messages.get(i).getData(), expectedMessage.getBytes());
         }
 
-        admin.topics().skipMessages(persistentTopicName, "my-sub", 5);
+        admin.topics().skipMessages(persistentTopicName, subName, 5);
         topicStats = admin.topics().getStats(persistentTopicName);
-        assertEquals(topicStats.subscriptions.get("my-sub").msgBacklog, 5);
+        assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 5);
 
-        admin.topics().skipAllMessages(persistentTopicName, "my-sub");
+        admin.topics().skipAllMessages(persistentTopicName, subName);
         topicStats = admin.topics().getStats(persistentTopicName);
-        assertEquals(topicStats.subscriptions.get("my-sub").msgBacklog, 0);
+        assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 0);
 
         consumer.close();
         client.close();
 
-        admin.topics().deleteSubscription(persistentTopicName, "my-sub");
+        admin.topics().deleteSubscription(persistentTopicName, subName);
 
         assertEquals(admin.topics().getSubscriptions(persistentTopicName), Lists.newArrayList());
         topicStats = admin.topics().getStats(persistentTopicName);
@@ -757,7 +758,7 @@ public void persistentTopics(String topicName) throws Exception {
         assertEquals(topicStats.publishers.size(), 0);
 
         try {
-            admin.topics().skipAllMessages(persistentTopicName, "my-sub");
+            admin.topics().skipAllMessages(persistentTopicName, subName);
         } catch (NotFoundException e) {
         }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 8df934853d..0b04ba3c5f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -138,8 +138,8 @@ public void cleanup() throws Exception {
     @Test
     public void testIncrementPartitionsOfTopic() throws Exception {
         final String topicName = "increment-partitionedTopic";
-        final String subName1 = topicName + "-my-sub-1";
-        final String subName2 = topicName + "-my-sub-2";
+        final String subName1 = topicName + "-my-sub-1/encode";
+        final String subName2 = topicName + "-my-sub-2/encode";
         final int startPartitions = 4;
         final int newPartitions = 8;
         final String partitionedTopicName = "persistent://prop-xyz/ns1/" + topicName;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services