You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/12 11:26:12 UTC

[pulsar] 02/03: [feature][admin] Support to get topic properties. (#15944)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6648f996ffd3c0dae243640e7abaac51438143f8
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Jun 8 12:34:21 2022 +0800

    [feature][admin] Support to get topic properties. (#15944)
    
    As https://github.com/apache/pulsar/pull/12818 has supported creating topics with metadata, this patch is adding a `get` API to support getting topic properties.
    
    (cherry picked from commit 1ebe4ee00cedbc1c9d955d93aaa9ab86e7bf8092)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  4 +++
 .../broker/admin/impl/PersistentTopicsBase.java    | 28 ++++++++++++++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 34 ++++++++++++++++++++++
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 21 +++++++++++++
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 13 +++++++++
 .../org/apache/pulsar/client/admin/Topics.java     | 16 ++++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 26 +++++++++++++++++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 14 +++++++++
 8 files changed, 156 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index fe5d1056e18..94b2090c3d1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -854,4 +854,8 @@ public abstract class AdminResource extends PulsarWebResource {
                 && ((WebApplicationException) realCause).getResponse().getStatus()
                 == Status.TEMPORARY_REDIRECT.getStatusCode();
     }
+
+    protected static String getTopicNotFoundErrorMessage(String topic) {
+        return String.format("Topic %s not found", topic);
+    }
 }
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index f118172045d..7bdefb29098 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -568,6 +568,34 @@ public class PersistentTopicsBase extends AdminResource {
             validateClientVersion();
         }
         return metadata;
+}
+
+    protected CompletableFuture<Map<String, String>> internalGetPropertiesAsync(boolean authoritative) {
+        return validateTopicOwnershipAsync(topicName, authoritative)
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_METADATA))
+                .thenCompose(__ -> {
+                    if (topicName.isPartitioned()) {
+                        return getPropertiesAsync();
+                    }
+                    return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
+                            .thenCompose(metadata -> {
+                                if (metadata.partitions == 0) {
+                                    return getPropertiesAsync();
+                                }
+                                return CompletableFuture.completedFuture(metadata.properties);
+                            });
+                });
+    }
+
+    private CompletableFuture<Map<String, String>> getPropertiesAsync() {
+        return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
+                .thenApply(opt -> {
+                    if (!opt.isPresent()) {
+                        throw new RestException(Status.NOT_FOUND,
+                                getTopicNotFoundErrorMessage(topicName.toString()));
+                    }
+                    return ((PersistentTopic) opt.get()).getManagedLedger().getProperties();
+        });
     }
 
     protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 754f54ca515..c0f69f3270c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -861,6 +861,40 @@ public class PersistentTopics extends PersistentTopicsBase {
         return internalGetPartitionedMetadata(authoritative, checkAllowAutoCreation);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/properties")
+    @ApiOperation(value = "Get topic properties.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "Topic name is invalid"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    public void getProperties(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+        validatePersistentTopicName(tenant, namespace, encodedTopic);
+        internalGetPropertiesAsync(authoritative)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get topic {} properties", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
     @DELETE
     @Path("/{tenant}/{namespace}/{topic}/partitions")
     @ApiOperation(value = "Delete a partitioned topic.",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 3a2385722e9..6fc65fb45de 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -858,6 +858,27 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
         assertEquals(topicsInNs.size(), 0);
     }
 
+    @Test
+    public void testCreateAndGetTopicProperties() throws Exception {
+        final String namespace = "prop-xyz/ns2";
+        final String nonPartitionedTopicName = "persistent://" + namespace + "/non-partitioned-TopicProperties";
+        admin.namespaces().createNamespace(namespace, 20);
+        Map<String, String> nonPartitionedTopicProperties = new HashMap<>();
+        nonPartitionedTopicProperties.put("key1", "value1");
+        admin.topics().createNonPartitionedTopic(nonPartitionedTopicName, nonPartitionedTopicProperties);
+        Map<String, String> properties11 = admin.topics().getProperties(nonPartitionedTopicName);
+        Assert.assertNotNull(properties11);
+        Assert.assertEquals(properties11.get("key1"), "value1");
+
+        final String partitionedTopicName = "persistent://" + namespace + "/partitioned-TopicProperties";
+        Map<String, String> partitionedTopicProperties = new HashMap<>();
+        partitionedTopicProperties.put("key2", "value2");
+        admin.topics().createPartitionedTopic(partitionedTopicName, 2, partitionedTopicProperties);
+        Map<String, String> properties22 = admin.topics().getProperties(partitionedTopicName);
+        Assert.assertNotNull(properties22);
+        Assert.assertEquals(properties22.get("key2"), "value2");
+    }
+
     @Test
     public void testNonPersistentTopics() throws Exception {
         final String namespace = "prop-xyz/ns2";
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index f75d41be675..b8837e5d686 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -424,6 +424,13 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         PartitionedTopicMetadata pMetadata2 = persistentTopics.getPartitionedMetadata(
                 testTenant, testNamespace, topicName2, true, false);
         Assert.assertNull(pMetadata2.properties);
+        AsyncResponse metaResponse = mock(AsyncResponse.class);
+        ArgumentCaptor<Map> metaResponseCaptor2 = ArgumentCaptor.forClass(Map.class);
+        persistentTopics.getProperties(metaResponse,
+                testTenant, testNamespace, topicName2, true);
+        verify(metaResponse, timeout(5000).times(1)).resume(metaResponseCaptor2.capture());
+        Assert.assertNotNull(metaResponseCaptor2.getValue());
+        Assert.assertEquals(metaResponseCaptor2.getValue().get("key1"), "value1");
     }
 
     @Test
@@ -447,6 +454,12 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
             Assert.assertEquals(pMetadata2.properties.size(), 1);
             Assert.assertEquals(pMetadata2.properties, topicMetadata);
         });
+        AsyncResponse response3 = mock(AsyncResponse.class);
+        ArgumentCaptor<Map> metaResponseCaptor2 = ArgumentCaptor.forClass(Map.class);
+        persistentTopics.getProperties(response3, testTenant, testNamespace, topicName2, true);
+        verify(response3, timeout(5000).times(1)).resume(metaResponseCaptor2.capture());
+        Assert.assertNotNull(metaResponseCaptor2.getValue());
+        Assert.assertEquals(metaResponseCaptor2.getValue().get("key1"), "value1");
     }
 
     @Test
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index eab3b8041f3..73f9a199a1b 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -663,6 +663,22 @@ public interface Topics {
      */
     CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(String topic);
 
+    /**
+     * Get properties of a topic.
+     * @param topic
+     *            Topic name
+     * @return Topic properties
+     */
+    Map<String, String> getProperties(String topic) throws PulsarAdminException;
+
+    /**
+     * Get properties of a topic asynchronously.
+     * @param topic
+     *            Topic name
+     * @return a future that can be used to track when the topic properties is returned
+     */
+    CompletableFuture<Map<String, String>> getPropertiesAsync(String topic);
+
     /**
      * Delete a partitioned topic.
      * <p/>
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index a5f5edae290..dbdaffd9e5c 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -448,6 +448,32 @@ public class TopicsImpl extends BaseResource implements Topics {
         return future;
     }
 
+    @Override
+    public Map<String, String> getProperties(String topic) throws PulsarAdminException {
+        return sync(() -> getPropertiesAsync(topic));
+    }
+
+    @Override
+    public CompletableFuture<Map<String, String>> getPropertiesAsync(String topic) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "properties");
+        final CompletableFuture<Map<String, String>> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Map<String, String>>() {
+
+                    @Override
+                    public void completed(Map<String, String> response) {
+                        future.complete(response);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
     @Override
     public void deletePartitionedTopic(String topic) throws PulsarAdminException {
         deletePartitionedTopic(topic, false);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 7c3736616e9..8fc8d5646d7 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -118,6 +118,7 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("create", new CreateNonPartitionedCmd());
         jcommander.addCommand("update-partitioned-topic", new UpdatePartitionedCmd());
         jcommander.addCommand("get-partitioned-topic-metadata", new GetPartitionedTopicMetadataCmd());
+        jcommander.addCommand("get-properties", new GetPropertiesCmd());
 
         jcommander.addCommand("delete-partitioned-topic", new DeletePartitionedCmd());
         jcommander.addCommand("peek-messages", new PeekMessages());
@@ -591,6 +592,19 @@ public class CmdTopics extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get the topic properties.")
+    private class GetPropertiesCmd extends CliCommand {
+
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws Exception {
+            String topic = validateTopicName(params);
+            print(getTopics().getProperties(topic));
+        }
+    }
+
     @Parameters(commandDescription = "Delete a partitioned topic. "
             + "It will also delete all the partitions of the topic if it exists.")
     private class DeletePartitionedCmd extends CliCommand {