You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/10/24 00:51:11 UTC
[pulsar] branch master updated: Support admin-api for
subscription-name with wildcard char (#2809)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c1dc39b Support admin-api for subscription-name with wildcard char (#2809)
c1dc39b is described below
commit c1dc39b628fe5c7f2f4e77c261c86be45122176d
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Oct 23 17:51:06 2018 -0700
Support admin-api for subscription-name with wildcard char (#2809)
---
.../pulsar/broker/admin/v1/PersistentTopics.java | 33 +++++++++++-----------
.../pulsar/broker/admin/v2/PersistentTopics.java | 33 +++++++++++-----------
.../apache/pulsar/broker/admin/AdminApiTest.java | 29 ++++++++++---------
.../apache/pulsar/broker/admin/AdminApiTest2.java | 4 +--
4 files changed, 51 insertions(+), 48 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index b5fd76c..c453622 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TopicStats;
+import static org.apache.pulsar.common.util.Codec.decode;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
@@ -285,10 +286,10 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 412, message = "Subscription has active consumers") })
public void deleteSubscription(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
- @PathParam("subName") String subName,
+ @PathParam("subName") String encodedSubName,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
- internalDeleteSubscription(subName, authoritative);
+ internalDeleteSubscription(decode(encodedSubName), authoritative);
}
@POST
@@ -299,10 +300,10 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 404, message = "Topic or subscription does not exist") })
public void skipAllMessages(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
- @PathParam("subName") String subName,
+ @PathParam("subName") String encodedSubName,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
- internalSkipAllMessages(subName, authoritative);
+ internalSkipAllMessages(decode(encodedSubName), authoritative);
}
@POST
@@ -312,10 +313,10 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 404, message = "Topic or subscription does not exist") })
public void skipMessages(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
- @PathParam("subName") String subName, @PathParam("numMessages") int numMessages,
+ @PathParam("subName") String encodedSubName, @PathParam("numMessages") int numMessages,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
- internalSkipMessages(subName, numMessages, authoritative);
+ internalSkipMessages(decode(encodedSubName), numMessages, authoritative);
}
@POST
@@ -325,10 +326,10 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 404, message = "Topic or subscription does not exist") })
public void expireTopicMessages(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
- @PathParam("subName") String subName, @PathParam("expireTimeInSeconds") int expireTimeInSeconds,
+ @PathParam("subName") String encodedSubName, @PathParam("expireTimeInSeconds") int expireTimeInSeconds,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
- internalExpireMessages(subName, expireTimeInSeconds, authoritative);
+ internalExpireMessages(decode(encodedSubName), expireTimeInSeconds, authoritative);
}
@POST
@@ -352,10 +353,10 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 404, message = "Topic/Subscription does not exist") })
public void resetCursor(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
- @PathParam("subName") String subName, @PathParam("timestamp") long timestamp,
+ @PathParam("subName") String encodedSubName, @PathParam("timestamp") long timestamp,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
- internalResetCursor(subName, timestamp, authoritative);
+ internalResetCursor(decode(encodedSubName), timestamp, authoritative);
}
@POST
@@ -366,10 +367,10 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 405, message = "Not supported for partitioned topics") })
public void resetCursorOnPosition(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
- @PathParam("subName") String subName,
+ @PathParam("subName") String encodedSubName,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
validateTopicName(property, cluster, namespace, encodedTopic);
- internalResetCursorOnPosition(subName, authoritative, messageId);
+ internalResetCursorOnPosition(decode(encodedSubName), authoritative, messageId);
}
@PUT
@@ -380,10 +381,10 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 405, message = "Not supported for partitioned topics") })
public void createSubscription(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String topic,
- @PathParam("subscriptionName") String subscriptionName,
+ @PathParam("subscriptionName") String encodedSubName,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
validateTopicName(property, cluster, namespace, topic);
- internalCreateSubscription(subscriptionName, messageId, authoritative);
+ internalCreateSubscription(decode(encodedSubName), messageId, authoritative);
}
@GET
@@ -393,10 +394,10 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 404, message = "Topic, subscription or the message position does not exist") })
public Response peekNthMessage(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
- @PathParam("subName") String subName, @PathParam("messagePosition") int messagePosition,
+ @PathParam("subName") String encodedSubName, @PathParam("messagePosition") int messagePosition,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
- return internalPeekNthMessage(subName, messagePosition, authoritative);
+ return internalPeekNthMessage(decode(encodedSubName), messagePosition, authoritative);
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 771a057..0737026 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -53,6 +53,7 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
+import static org.apache.pulsar.common.util.Codec.decode;
/**
*/
@@ -291,10 +292,10 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 412, message = "Subscription has active consumers") })
public void deleteSubscription(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
- @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String subName,
+ @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- internalDeleteSubscription(subName, authoritative);
+ internalDeleteSubscription(decode(encodedSubName), authoritative);
}
@POST
@@ -304,10 +305,10 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 405, message = "Operation not allowed on non-persistent topic"),
@ApiResponse(code = 404, message = "Topic or subscription does not exist") })
public void skipAllMessages(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
- @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String subName,
+ @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- internalSkipAllMessages(subName, authoritative);
+ internalSkipAllMessages(decode(encodedSubName), authoritative);
}
@POST
@@ -316,11 +317,11 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic or subscription does not exist") })
public void skipMessages(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
- @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String subName,
+ @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
@PathParam("numMessages") int numMessages,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- internalSkipMessages(subName, numMessages, authoritative);
+ internalSkipMessages(decode(encodedSubName), numMessages, authoritative);
}
@POST
@@ -329,11 +330,11 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic or subscription does not exist") })
public void expireTopicMessages(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
- @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String subName,
+ @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
@PathParam("expireTimeInSeconds") int expireTimeInSeconds,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- internalExpireMessages(subName, expireTimeInSeconds, authoritative);
+ internalExpireMessages(decode(encodedSubName), expireTimeInSeconds, authoritative);
}
@POST
@@ -356,10 +357,10 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
@ApiResponse(code = 405, message = "Not supported for partitioned topics") })
public void createSubscription(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
- @PathParam("topic") @Encoded String topic, @PathParam("subscriptionName") String subscriptionName,
+ @PathParam("topic") @Encoded String topic, @PathParam("subscriptionName") String encodedSubName,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
validateTopicName(tenant, namespace, topic);
- internalCreateSubscription(subscriptionName, messageId, authoritative);
+ internalCreateSubscription(decode(encodedSubName), messageId, authoritative);
}
@POST
@@ -368,11 +369,11 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic/Subscription does not exist") })
public void resetCursor(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
- @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String subName,
+ @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
@PathParam("timestamp") long timestamp,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- internalResetCursor(subName, timestamp, authoritative);
+ internalResetCursor(decode(encodedSubName), timestamp, authoritative);
}
@POST
@@ -382,10 +383,10 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
@ApiResponse(code = 405, message = "Not supported for partitioned topics") })
public void resetCursorOnPosition(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
- @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String subName,
+ @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
validateTopicName(tenant, namespace, encodedTopic);
- internalResetCursorOnPosition(subName, authoritative, messageId);
+ internalResetCursorOnPosition(decode(encodedSubName), authoritative, messageId);
}
@GET
@@ -394,11 +395,11 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic, subscription or the message position does not exist") })
public Response peekNthMessage(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
- @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String subName,
+ @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
@PathParam("messagePosition") int messagePosition,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- return internalPeekNthMessage(subName, messagePosition, authoritative);
+ return internalPeekNthMessage(decode(encodedSubName), messagePosition, authoritative);
}
@GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 5c18dfe..6b17a73 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -696,6 +696,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
@Test(dataProvider = "topicName")
public void persistentTopics(String topicName) throws Exception {
+ final String subName = topicName;
assertEquals(admin.topics().getList("prop-xyz/ns1"), Lists.newArrayList());
final String persistentTopicName = "persistent://prop-xyz/ns1/" + topicName;
@@ -708,48 +709,48 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
URL pulsarUrl = new URL("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT);
PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
- Consumer<byte[]> consumer = client.newConsumer().topic(persistentTopicName).subscriptionName("my-sub")
+ Consumer<byte[]> consumer = client.newConsumer().topic(persistentTopicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Exclusive).subscribe();
- assertEquals(admin.topics().getSubscriptions(persistentTopicName), Lists.newArrayList("my-sub"));
+ assertEquals(admin.topics().getSubscriptions(persistentTopicName), Lists.newArrayList(subName));
publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + topicName, 10);
TopicStats topicStats = admin.topics().getStats(persistentTopicName);
- assertEquals(topicStats.subscriptions.keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub")));
- assertEquals(topicStats.subscriptions.get("my-sub").consumers.size(), 1);
- assertEquals(topicStats.subscriptions.get("my-sub").msgBacklog, 10);
+ assertEquals(topicStats.subscriptions.keySet(), Sets.newTreeSet(Lists.newArrayList(subName)));
+ assertEquals(topicStats.subscriptions.get(subName).consumers.size(), 1);
+ assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 10);
assertEquals(topicStats.publishers.size(), 0);
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(persistentTopicName);
- assertEquals(internalStats.cursors.keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub")));
+ assertEquals(internalStats.cursors.keySet(), Sets.newTreeSet(Lists.newArrayList(Codec.encode(subName))));
- List<Message<byte[]>> messages = admin.topics().peekMessages(persistentTopicName, "my-sub", 3);
+ List<Message<byte[]>> messages = admin.topics().peekMessages(persistentTopicName, subName, 3);
assertEquals(messages.size(), 3);
for (int i = 0; i < 3; i++) {
String expectedMessage = "message-" + i;
assertEquals(messages.get(i).getData(), expectedMessage.getBytes());
}
- messages = admin.topics().peekMessages(persistentTopicName, "my-sub", 15);
+ messages = admin.topics().peekMessages(persistentTopicName, subName, 15);
assertEquals(messages.size(), 10);
for (int i = 0; i < 10; i++) {
String expectedMessage = "message-" + i;
assertEquals(messages.get(i).getData(), expectedMessage.getBytes());
}
- admin.topics().skipMessages(persistentTopicName, "my-sub", 5);
+ admin.topics().skipMessages(persistentTopicName, subName, 5);
topicStats = admin.topics().getStats(persistentTopicName);
- assertEquals(topicStats.subscriptions.get("my-sub").msgBacklog, 5);
+ assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 5);
- admin.topics().skipAllMessages(persistentTopicName, "my-sub");
+ admin.topics().skipAllMessages(persistentTopicName, subName);
topicStats = admin.topics().getStats(persistentTopicName);
- assertEquals(topicStats.subscriptions.get("my-sub").msgBacklog, 0);
+ assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 0);
consumer.close();
client.close();
- admin.topics().deleteSubscription(persistentTopicName, "my-sub");
+ admin.topics().deleteSubscription(persistentTopicName, subName);
assertEquals(admin.topics().getSubscriptions(persistentTopicName), Lists.newArrayList());
topicStats = admin.topics().getStats(persistentTopicName);
@@ -757,7 +758,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
assertEquals(topicStats.publishers.size(), 0);
try {
- admin.topics().skipAllMessages(persistentTopicName, "my-sub");
+ admin.topics().skipAllMessages(persistentTopicName, subName);
} catch (NotFoundException e) {
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 8df9348..0b04ba3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -138,8 +138,8 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
@Test
public void testIncrementPartitionsOfTopic() throws Exception {
final String topicName = "increment-partitionedTopic";
- final String subName1 = topicName + "-my-sub-1";
- final String subName2 = topicName + "-my-sub-2";
+ final String subName1 = topicName + "-my-sub-1/encode";
+ final String subName2 = topicName + "-my-sub-2/encode";
final int startPartitions = 4;
final int newPartitions = 8;
final String partitionedTopicName = "persistent://prop-xyz/ns1/" + topicName;