You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/12 11:26:12 UTC
[pulsar] 02/03: [feature][admin] Support to get topic properties. (#15944)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6648f996ffd3c0dae243640e7abaac51438143f8
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Jun 8 12:34:21 2022 +0800
[feature][admin] Support to get topic properties. (#15944)
As https://github.com/apache/pulsar/pull/12818 has supported creating topics with metadata, this patch is adding a `get` API to support getting topic properties.
(cherry picked from commit 1ebe4ee00cedbc1c9d955d93aaa9ab86e7bf8092)
---
.../apache/pulsar/broker/admin/AdminResource.java | 4 +++
.../broker/admin/impl/PersistentTopicsBase.java | 28 ++++++++++++++++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 34 ++++++++++++++++++++++
.../apache/pulsar/broker/admin/AdminApi2Test.java | 21 +++++++++++++
.../pulsar/broker/admin/PersistentTopicsTest.java | 13 +++++++++
.../org/apache/pulsar/client/admin/Topics.java | 16 ++++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 26 +++++++++++++++++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 14 +++++++++
8 files changed, 156 insertions(+)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index fe5d1056e18..94b2090c3d1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -854,4 +854,8 @@ public abstract class AdminResource extends PulsarWebResource {
&& ((WebApplicationException) realCause).getResponse().getStatus()
== Status.TEMPORARY_REDIRECT.getStatusCode();
}
+
+ protected static String getTopicNotFoundErrorMessage(String topic) {
+ return String.format("Topic %s not found", topic);
+ }
}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index f118172045d..7bdefb29098 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -568,6 +568,34 @@ public class PersistentTopicsBase extends AdminResource {
validateClientVersion();
}
return metadata;
+}
+
+ protected CompletableFuture<Map<String, String>> internalGetPropertiesAsync(boolean authoritative) {
+ return validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_METADATA))
+ .thenCompose(__ -> {
+ if (topicName.isPartitioned()) {
+ return getPropertiesAsync();
+ }
+ return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
+ .thenCompose(metadata -> {
+ if (metadata.partitions == 0) {
+ return getPropertiesAsync();
+ }
+ return CompletableFuture.completedFuture(metadata.properties);
+ });
+ });
+ }
+
+ private CompletableFuture<Map<String, String>> getPropertiesAsync() {
+ return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
+ .thenApply(opt -> {
+ if (!opt.isPresent()) {
+ throw new RestException(Status.NOT_FOUND,
+ getTopicNotFoundErrorMessage(topicName.toString()));
+ }
+ return ((PersistentTopic) opt.get()).getManagedLedger().getProperties();
+ });
}
protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 754f54ca515..c0f69f3270c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -861,6 +861,40 @@ public class PersistentTopics extends PersistentTopicsBase {
return internalGetPartitionedMetadata(authoritative, checkAllowAutoCreation);
}
+ @GET
+ @Path("/{tenant}/{namespace}/{topic}/properties")
+ @ApiOperation(value = "Get topic properties.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+ @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification"),
+ @ApiResponse(code = 412, message = "Topic name is invalid"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ public void getProperties(
+ @Suspended final AsyncResponse asyncResponse,
+ @ApiParam(value = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Is authentication required to perform this operation")
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+ validatePersistentTopicName(tenant, namespace, encodedTopic);
+ internalGetPropertiesAsync(authoritative)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get topic {} properties", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
@DELETE
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Delete a partitioned topic.",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 3a2385722e9..6fc65fb45de 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -858,6 +858,27 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
assertEquals(topicsInNs.size(), 0);
}
+ @Test
+ public void testCreateAndGetTopicProperties() throws Exception {
+ final String namespace = "prop-xyz/ns2";
+ final String nonPartitionedTopicName = "persistent://" + namespace + "/non-partitioned-TopicProperties";
+ admin.namespaces().createNamespace(namespace, 20);
+ Map<String, String> nonPartitionedTopicProperties = new HashMap<>();
+ nonPartitionedTopicProperties.put("key1", "value1");
+ admin.topics().createNonPartitionedTopic(nonPartitionedTopicName, nonPartitionedTopicProperties);
+ Map<String, String> properties11 = admin.topics().getProperties(nonPartitionedTopicName);
+ Assert.assertNotNull(properties11);
+ Assert.assertEquals(properties11.get("key1"), "value1");
+
+ final String partitionedTopicName = "persistent://" + namespace + "/partitioned-TopicProperties";
+ Map<String, String> partitionedTopicProperties = new HashMap<>();
+ partitionedTopicProperties.put("key2", "value2");
+ admin.topics().createPartitionedTopic(partitionedTopicName, 2, partitionedTopicProperties);
+ Map<String, String> properties22 = admin.topics().getProperties(partitionedTopicName);
+ Assert.assertNotNull(properties22);
+ Assert.assertEquals(properties22.get("key2"), "value2");
+ }
+
@Test
public void testNonPersistentTopics() throws Exception {
final String namespace = "prop-xyz/ns2";
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index f75d41be675..b8837e5d686 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -424,6 +424,13 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
PartitionedTopicMetadata pMetadata2 = persistentTopics.getPartitionedMetadata(
testTenant, testNamespace, topicName2, true, false);
Assert.assertNull(pMetadata2.properties);
+ AsyncResponse metaResponse = mock(AsyncResponse.class);
+ ArgumentCaptor<Map> metaResponseCaptor2 = ArgumentCaptor.forClass(Map.class);
+ persistentTopics.getProperties(metaResponse,
+ testTenant, testNamespace, topicName2, true);
+ verify(metaResponse, timeout(5000).times(1)).resume(metaResponseCaptor2.capture());
+ Assert.assertNotNull(metaResponseCaptor2.getValue());
+ Assert.assertEquals(metaResponseCaptor2.getValue().get("key1"), "value1");
}
@Test
@@ -447,6 +454,12 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
Assert.assertEquals(pMetadata2.properties.size(), 1);
Assert.assertEquals(pMetadata2.properties, topicMetadata);
});
+ AsyncResponse response3 = mock(AsyncResponse.class);
+ ArgumentCaptor<Map> metaResponseCaptor2 = ArgumentCaptor.forClass(Map.class);
+ persistentTopics.getProperties(response3, testTenant, testNamespace, topicName2, true);
+ verify(response3, timeout(5000).times(1)).resume(metaResponseCaptor2.capture());
+ Assert.assertNotNull(metaResponseCaptor2.getValue());
+ Assert.assertEquals(metaResponseCaptor2.getValue().get("key1"), "value1");
}
@Test
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index eab3b8041f3..73f9a199a1b 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -663,6 +663,22 @@ public interface Topics {
*/
CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(String topic);
+ /**
+ * Get properties of a topic.
+ * @param topic
+ * Topic name
+ * @return Topic properties
+ */
+ Map<String, String> getProperties(String topic) throws PulsarAdminException;
+
+ /**
+ * Get properties of a topic asynchronously.
+ * @param topic
+ * Topic name
+ * @return a future that can be used to track when the topic properties is returned
+ */
+ CompletableFuture<Map<String, String>> getPropertiesAsync(String topic);
+
/**
* Delete a partitioned topic.
* <p/>
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index a5f5edae290..dbdaffd9e5c 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -448,6 +448,32 @@ public class TopicsImpl extends BaseResource implements Topics {
return future;
}
+ @Override
+ public Map<String, String> getProperties(String topic) throws PulsarAdminException {
+ return sync(() -> getPropertiesAsync(topic));
+ }
+
+ @Override
+ public CompletableFuture<Map<String, String>> getPropertiesAsync(String topic) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "properties");
+ final CompletableFuture<Map<String, String>> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Map<String, String>>() {
+
+ @Override
+ public void completed(Map<String, String> response) {
+ future.complete(response);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
@Override
public void deletePartitionedTopic(String topic) throws PulsarAdminException {
deletePartitionedTopic(topic, false);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 7c3736616e9..8fc8d5646d7 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -118,6 +118,7 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("create", new CreateNonPartitionedCmd());
jcommander.addCommand("update-partitioned-topic", new UpdatePartitionedCmd());
jcommander.addCommand("get-partitioned-topic-metadata", new GetPartitionedTopicMetadataCmd());
+ jcommander.addCommand("get-properties", new GetPropertiesCmd());
jcommander.addCommand("delete-partitioned-topic", new DeletePartitionedCmd());
jcommander.addCommand("peek-messages", new PeekMessages());
@@ -591,6 +592,19 @@ public class CmdTopics extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get the topic properties.")
+ private class GetPropertiesCmd extends CliCommand {
+
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws Exception {
+ String topic = validateTopicName(params);
+ print(getTopics().getProperties(topic));
+ }
+ }
+
@Parameters(commandDescription = "Delete a partitioned topic. "
+ "It will also delete all the partitions of the topic if it exists.")
private class DeletePartitionedCmd extends CliCommand {