You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/06/21 07:39:18 UTC
[pulsar] branch master updated: PIP-105: new API to get subscription properties (#16095)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new face8bb0ab4 PIP-105: new API to get subscription properties (#16095)
face8bb0ab4 is described below
commit face8bb0ab48d09d5aa293cd9bbdab66232207da
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Tue Jun 21 09:39:10 2022 +0200
PIP-105: new API to get subscription properties (#16095)
---
.../broker/admin/impl/PersistentTopicsBase.java | 114 +++++++++++++++++++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 36 +++++++
.../broker/admin/AdminApiSubscriptionTest.java | 36 +++++++
.../org/apache/pulsar/client/admin/Topics.java | 16 +++
.../pulsar/client/admin/internal/TopicsImpl.java | 29 ++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 4 +
.../org/apache/pulsar/admin/cli/CmdTopics.java | 20 ++++
.../pulsar/tests/integration/cli/CLITest.java | 26 +++++
8 files changed, 281 insertions(+)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 8fb47b72ae0..9d63b5e5ee3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1614,6 +1614,35 @@ public class PersistentTopicsBase extends AdminResource {
});
}
+ private void internalGetSubscriptionPropertiesForNonPartitionedTopic(AsyncResponse asyncResponse,
+ String subName,
+ boolean authoritative) {
+ validateTopicOwnershipAsync(topicName, authoritative)
+ .thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME))
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenApply((Topic topic) -> {
+ Subscription sub = topic.getSubscription(subName);
+ if (sub == null) {
+ throw new RestException(Status.NOT_FOUND,
+ getSubNotFoundErrorMessage(topicName.toString(), subName));
+ }
+ return sub.getSubscriptionProperties();
+ }).thenAccept((Map<String, String> properties) -> {
+ if (properties == null) {
+ properties = Collections.emptyMap();
+ }
+ asyncResponse.resume(Response.ok(properties).build());
+ }).exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ // If the exception is not redirect exception we need to log it.
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to update subscription {} {}", clientAppId(), topicName, subName, cause);
+ }
+ asyncResponse.resume(new RestException(cause));
+ return null;
+ });
+ }
+
protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse,
String subName, boolean authoritative) {
CompletableFuture<Void> future;
@@ -2422,6 +2451,91 @@ public class PersistentTopicsBase extends AdminResource {
});
}
+ protected void internalGetSubscriptionProperties(AsyncResponse asyncResponse, String subName,
+ boolean authoritative) {
+ CompletableFuture<Void> future;
+ if (topicName.isGlobal()) {
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ future = CompletableFuture.completedFuture(null);
+ }
+
+ future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> {
+ if (topicName.isPartitioned()) {
+ internalGetSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
+ authoritative);
+ } else {
+ getPartitionedTopicMetadataAsync(topicName,
+ authoritative, false).thenAcceptAsync(partitionMetadata -> {
+ if (partitionMetadata.partitions > 0) {
+ final List<CompletableFuture<Map<String, String>>> futures = Lists.newArrayList();
+
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ TopicName topicNamePartition = topicName.getPartition(i);
+ try {
+ futures.add(pulsar().getAdminClient().topics()
+ .getSubscriptionPropertiesAsync(topicNamePartition.toString(),
+ subName));
+ } catch (Exception e) {
+ log.error("[{}] Failed to update properties for subscription {} {}",
+ clientAppId(), topicNamePartition, subName,
+ e);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ }
+
+ FutureUtil.waitForAll(futures).handle((result, exception) -> {
+ if (exception != null) {
+ Throwable t = exception.getCause();
+ if (t instanceof NotFoundException) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ getSubNotFoundErrorMessage(topicName.toString(), subName)));
+ return null;
+ } else {
+ log.error("[{}] Failed to get properties for subscription {} {}",
+ clientAppId(), topicName, subName, t);
+ asyncResponse.resume(new RestException(t));
+ return null;
+ }
+ }
+
+ Map<String, String> aggregatedResult = new HashMap<>();
+ futures.forEach(f -> {
+ // in theory all the partitions have the same properties
+ try {
+ aggregatedResult.putAll(f.get());
+ } catch (Exception impossible) {
+ // we already waited for this Future
+ asyncResponse.resume(new RestException(impossible));
+ }
+ });
+
+ asyncResponse.resume(Response.ok(aggregatedResult).build());
+ return null;
+ });
+ } else {
+ internalGetSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
+ authoritative);
+ }
+ }, pulsar().getExecutor()).exceptionally(ex -> {
+ log.error("[{}] Failed to update properties for subscription {} from topic {}",
+ clientAppId(), subName, topicName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+ }).exceptionally(ex -> {
+ // If the exception is not redirect exception we need to log it.
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to update subscription {} from topic {}",
+ clientAppId(), subName, topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String subName, boolean authoritative,
MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
CompletableFuture<Void> ret;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index daadb9894bc..d0033d91e68 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1598,6 +1598,42 @@ public class PersistentTopics extends PersistentTopicsBase {
}
}
+ @GET
+ @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/properties")
+ @ApiOperation(value = "Replaces all the properties on the given subscription")
+ @ApiResponses(value = {
+ @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+ @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+ + "subscriber is not authorized to access this operation"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
+ @ApiResponse(code = 405, message = "Method Not Allowed"),
+ @ApiResponse(code = 500, message = "Internal server error"),
+ @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
+ })
+ public void getSubscriptionProperties(
+ @Suspended final AsyncResponse asyncResponse,
+ @ApiParam(value = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Subscription", required = true)
+ @PathParam("subName") String encodedSubName,
+ @ApiParam(value = "Is authentication required to perform this operation")
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+ try {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalGetSubscriptionProperties(asyncResponse, decode(encodedSubName),
+ authoritative);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
+ }
+
@POST
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/resetcursor")
@ApiOperation(value = "Reset subscription to message position closest to given position.",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
index 67416b4288e..411565521d9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
@@ -152,6 +152,9 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest {
SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
.getSubscriptions().get(subscriptionName);
assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
+
+ Map<String, String> props = admin.topics().getSubscriptionProperties(topic + "-partition-" + i, subscriptionName);
+ assertEquals(value, props.get("foo"));
}
// properties are never null, but an empty map
@@ -159,6 +162,9 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest {
SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
.getSubscriptions().get(subscriptionName2);
assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());
+
+ Map<String, String> props = admin.topics().getSubscriptionProperties(topic + "-partition-" + i, subscriptionName2);
+ assertTrue(props.isEmpty());
}
// aggregated properties
@@ -166,12 +172,21 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest {
.getSubscriptions().get(subscriptionName);
assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
+ Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName);
+ assertEquals(value, props.get("foo"));
+
} else {
SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
+ Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName);
+ assertEquals(value, props.get("foo"));
+
SubscriptionStats subscriptionStats2 = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName2);
assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty());
+
+ Map<String, String> props2 = admin.topics().getSubscriptionProperties(topic, subscriptionName2);
+ assertTrue(props2.isEmpty());
}
// clear the properties on subscriptionName
@@ -183,6 +198,9 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest {
SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
.getSubscriptions().get(subscriptionName);
assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());
+
+ Map<String, String> props = admin.topics().getSubscriptionProperties(topic + "-partition-" + i, subscriptionName);
+ assertTrue(props.isEmpty());
}
// aggregated properties
@@ -190,9 +208,15 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest {
.getSubscriptions().get(subscriptionName);
assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());
+ Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName);
+ assertTrue(props.isEmpty());
+
} else {
SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());
+
+ Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName);
+ assertTrue(props.isEmpty());
}
// update the properties on subscriptionName
@@ -204,6 +228,9 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest {
SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
.getSubscriptions().get(subscriptionName);
assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
+
+ Map<String, String> props = admin.topics().getSubscriptionProperties(topic + "-partition-" + i, subscriptionName);
+ assertEquals(value, props.get("foo"));
}
// aggregated properties
@@ -211,12 +238,21 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest {
.getSubscriptions().get(subscriptionName);
assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
+ Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName);
+ assertEquals(value, props.get("foo"));
+
} else {
SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
+ Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName);
+ assertEquals(value, props.get("foo"));
+
SubscriptionStats subscriptionStats2 = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName2);
assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty());
+
+ Map<String, String> props2 = admin.topics().getSubscriptionProperties(topic, subscriptionName2);
+ assertTrue(props2.isEmpty());
}
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index bcacca00e59..a4486d846b1 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1839,6 +1839,15 @@ public interface Topics {
void updateSubscriptionProperties(String topic, String subName, Map<String, String> subscriptionProperties)
throws PulsarAdminException;
+ /**
+ * Get Subscription Properties on a topic subscription.
+ * @param topic
+ * @param subName
+ * @throws PulsarAdminException
+ */
+ Map<String, String> getSubscriptionProperties(String topic, String subName)
+ throws PulsarAdminException;
+
/**
* Reset cursor position on a topic subscription.
*
@@ -1873,6 +1882,13 @@ public interface Topics {
CompletableFuture<Void> updateSubscriptionPropertiesAsync(String topic, String subName,
Map<String, String> subscriptionProperties);
+ /**
+ * Get Subscription Properties on a topic subscription.
+ * @param topic
+ * @param subName
+ */
+ CompletableFuture<Map<String, String>> getSubscriptionPropertiesAsync(String topic, String subName);
+
/**
* Reset cursor position on a topic subscription.
*
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 938b85f1920..909b8f74ea5 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1213,6 +1213,12 @@ public class TopicsImpl extends BaseResource implements Topics {
sync(() -> updateSubscriptionPropertiesAsync(topic, subName, subscriptionProperties));
}
+ @Override
+ public Map<String, String> getSubscriptionProperties(String topic, String subName)
+ throws PulsarAdminException {
+ return sync(() -> getSubscriptionPropertiesAsync(topic, subName));
+ }
+
@Override
public CompletableFuture<Void> updateSubscriptionPropertiesAsync(String topic, String subName,
Map<String, String> subscriptionProperties) {
@@ -1226,6 +1232,29 @@ public class TopicsImpl extends BaseResource implements Topics {
return asyncPutRequest(path, Entity.entity(subscriptionProperties, MediaType.APPLICATION_JSON));
}
+ @Override
+ public CompletableFuture<Map<String, String>> getSubscriptionPropertiesAsync(String topic, String subName) {
+ TopicName tn = validateTopic(topic);
+ String encodedSubName = Codec.encode(subName);
+ WebTarget path = topicPath(tn, "subscription", encodedSubName,
+ "properties");
+ final CompletableFuture<Map<String, String>> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Map<String, String>>() {
+
+ @Override
+ public void completed(Map<String, String> response) {
+ future.complete(response);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
@Override
public void resetCursor(String topic, String subName, MessageId messageId
, boolean isExcluded) throws PulsarAdminException {
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 324c18deb38..74f12f0762e 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1494,6 +1494,10 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("update-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1 --clear"));
verify(mockTopics).updateSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1", new HashMap<>());
+ cmdTopics = new CmdTopics(() -> admin);
+ cmdTopics.run(split("get-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1"));
+ verify(mockTopics).getSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1");
+
cmdTopics = new CmdTopics(() -> admin);
props = new HashMap<>();
props.put("a", "b");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index bff613f4ce3..23cffd4e3a2 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -73,6 +73,7 @@ import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.util.DateFormatter;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.RelativeTimeUtil;
@Getter
@@ -98,6 +99,7 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("unsubscribe", new DeleteSubscription());
jcommander.addCommand("create-subscription", new CreateSubscription());
jcommander.addCommand("update-subscription-properties", new UpdateSubscriptionProperties());
+ jcommander.addCommand("get-subscription-properties", new GetSubscriptionProperties());
jcommander.addCommand("stats", new GetStats());
jcommander.addCommand("stats-internal", new GetInternalStats());
@@ -1006,6 +1008,24 @@ public class CmdTopics extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get the properties of a subscription on a topic")
+ private class GetSubscriptionProperties extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "-s",
+ "--subscription" }, description = "Subscription to describe", required = true)
+ private String subscriptionName;
+
+ @Override
+ void run() throws Exception {
+ String topic = validateTopicName(params);
+ Map<String, String> result = getTopics().getSubscriptionProperties(topic, subscriptionName);
+ // Ensure we are using JSON and not Java toString()
+ System.out.println(ObjectMapperFactory.getThreadLocal().writeValueAsString(result));
+ }
+ }
+
@Parameters(commandDescription = "Reset position for subscription to a position that is closest to "
+ "timestamp or messageId.")
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index 6c00314a7e5..a10c6ed3dd6 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -207,6 +207,18 @@ public class CLITest extends PulsarTestSuite {
);
resultUpdate.assertNoOutput();
+ ContainerExecResult resultGet = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "topics",
+ "get-subscription-properties",
+ "persistent://public/default/" + topic,
+ "--subscription",
+ "" + subscriptionPrefix + i
+ );
+ assertEquals(
+ resultGet.getStdout().trim(), "{\"a\":\"e\"}",
+ "unexpected output " + resultGet.getStdout() + " - error " + resultGet.getStderr());
+
ContainerExecResult resultClear = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"topics",
@@ -217,6 +229,20 @@ public class CLITest extends PulsarTestSuite {
"" + subscriptionPrefix + i
);
resultClear.assertNoOutput();
+
+ ContainerExecResult resultGetAfterClear = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "topics",
+ "get-subscription-properties",
+ "persistent://public/default/" + topic,
+ "--subscription",
+ "" + subscriptionPrefix + i
+ );
+ assertEquals(
+ resultGetAfterClear.getStdout().trim(), "{}",
+ "unexpected output " + resultGetAfterClear.getStdout()
+ + " - error " + resultGetAfterClear.getStderr());
+
i++;
}
}