You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/06/21 07:39:18 UTC

[pulsar] branch master updated: PIP-105: new API to get subscription properties (#16095)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new face8bb0ab4 PIP-105: new API to get subscription properties (#16095)
face8bb0ab4 is described below

commit face8bb0ab48d09d5aa293cd9bbdab66232207da
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Tue Jun 21 09:39:10 2022 +0200

    PIP-105: new API to get subscription properties (#16095)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 114 +++++++++++++++++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  36 +++++++
 .../broker/admin/AdminApiSubscriptionTest.java     |  36 +++++++
 .../org/apache/pulsar/client/admin/Topics.java     |  16 +++
 .../pulsar/client/admin/internal/TopicsImpl.java   |  29 ++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |   4 +
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  20 ++++
 .../pulsar/tests/integration/cli/CLITest.java      |  26 +++++
 8 files changed, 281 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 8fb47b72ae0..9d63b5e5ee3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1614,6 +1614,35 @@ public class PersistentTopicsBase extends AdminResource {
                 });
     }
 
+    private void internalGetSubscriptionPropertiesForNonPartitionedTopic(AsyncResponse asyncResponse,
+                                                                            String subName,
+                                                                            boolean authoritative) {
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenApply((Topic topic) -> {
+                    Subscription sub = topic.getSubscription(subName);
+                    if (sub == null) {
+                        throw new RestException(Status.NOT_FOUND,
+                                getSubNotFoundErrorMessage(topicName.toString(), subName));
+                    }
+                    return sub.getSubscriptionProperties();
+                }).thenAccept((Map<String, String> properties) -> {
+                    if (properties == null) {
+                        properties = Collections.emptyMap();
+                    }
+                    asyncResponse.resume(Response.ok(properties).build());
+                }).exceptionally(ex -> {
+                    Throwable cause = ex.getCause();
+                    // If the exception is not redirect exception we need to log it.
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to update subscription {} {}", clientAppId(), topicName, subName, cause);
+                    }
+                    asyncResponse.resume(new RestException(cause));
+                    return null;
+                });
+    }
+
     protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse,
                                                         String subName, boolean authoritative) {
         CompletableFuture<Void> future;
@@ -2422,6 +2451,91 @@ public class PersistentTopicsBase extends AdminResource {
         });
     }
 
+    protected void internalGetSubscriptionProperties(AsyncResponse asyncResponse, String subName,
+                                                        boolean authoritative) {
+        CompletableFuture<Void> future;
+        if (topicName.isGlobal()) {
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            future = CompletableFuture.completedFuture(null);
+        }
+
+        future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> {
+            if (topicName.isPartitioned()) {
+                internalGetSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
+                        authoritative);
+            } else {
+                getPartitionedTopicMetadataAsync(topicName,
+                        authoritative, false).thenAcceptAsync(partitionMetadata -> {
+                    if (partitionMetadata.partitions > 0) {
+                        final List<CompletableFuture<Map<String, String>>> futures = Lists.newArrayList();
+
+                        for (int i = 0; i < partitionMetadata.partitions; i++) {
+                            TopicName topicNamePartition = topicName.getPartition(i);
+                            try {
+                                futures.add(pulsar().getAdminClient().topics()
+                                        .getSubscriptionPropertiesAsync(topicNamePartition.toString(),
+                                                subName));
+                            } catch (Exception e) {
+                                log.error("[{}] Failed to update properties for subscription {} {}",
+                                        clientAppId(), topicNamePartition, subName,
+                                        e);
+                                asyncResponse.resume(new RestException(e));
+                                return;
+                            }
+                        }
+
+                        FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                            if (exception != null) {
+                                Throwable t = exception.getCause();
+                                if (t instanceof NotFoundException) {
+                                    asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                            getSubNotFoundErrorMessage(topicName.toString(), subName)));
+                                    return null;
+                                } else {
+                                    log.error("[{}] Failed to get properties for subscription {} {}",
+                                            clientAppId(), topicName, subName, t);
+                                    asyncResponse.resume(new RestException(t));
+                                    return null;
+                                }
+                            }
+
+                            Map<String, String> aggregatedResult = new HashMap<>();
+                            futures.forEach(f -> {
+                                // in theory all the partitions have the same properties
+                                try {
+                                    aggregatedResult.putAll(f.get());
+                                } catch (Exception impossible) {
+                                    // we already waited for this Future
+                                    asyncResponse.resume(new RestException(impossible));
+                                }
+                            });
+
+                            asyncResponse.resume(Response.ok(aggregatedResult).build());
+                            return null;
+                        });
+                    } else {
+                        internalGetSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
+                                authoritative);
+                    }
+                }, pulsar().getExecutor()).exceptionally(ex -> {
+                    log.error("[{}] Failed to update properties for subscription {} from topic {}",
+                            clientAppId(), subName, topicName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+            }
+        }).exceptionally(ex -> {
+            // If the exception is not redirect exception we need to log it.
+            if (!isRedirectException(ex)) {
+                log.error("[{}] Failed to update subscription {} from topic {}",
+                        clientAppId(), subName, topicName, ex);
+            }
+            resumeAsyncResponseExceptionally(asyncResponse, ex);
+            return null;
+        });
+    }
+
     protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String subName, boolean authoritative,
             MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
         CompletableFuture<Void> ret;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index daadb9894bc..d0033d91e68 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1598,6 +1598,42 @@ public class PersistentTopics extends PersistentTopicsBase {
         }
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/properties")
+    @ApiOperation(value = "Replaces all the properties on the given subscription")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+                    + "subscriber is not authorized to access this operation"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
+            @ApiResponse(code = 405, message = "Method Not Allowed"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
+    })
+    public void getSubscriptionProperties(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Subscription", required = true)
+            @PathParam("subName") String encodedSubName,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalGetSubscriptionProperties(asyncResponse, decode(encodedSubName),
+                    authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
     @POST
     @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/resetcursor")
     @ApiOperation(value = "Reset subscription to message position closest to given position.",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
index 67416b4288e..411565521d9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
@@ -152,6 +152,9 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest {
                 SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
                         .getSubscriptions().get(subscriptionName);
                 assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
+
+                Map<String, String> props = admin.topics().getSubscriptionProperties(topic + "-partition-" + i, subscriptionName);
+                assertEquals(value, props.get("foo"));
             }
 
             // properties are never null, but an empty map
@@ -159,6 +162,9 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest {
                 SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
                         .getSubscriptions().get(subscriptionName2);
                 assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());
+
+                Map<String, String> props = admin.topics().getSubscriptionProperties(topic + "-partition-" + i, subscriptionName2);
+                assertTrue(props.isEmpty());
             }
 
             // aggregated properties
@@ -166,12 +172,21 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest {
                     .getSubscriptions().get(subscriptionName);
             assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
 
+            Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName);
+            assertEquals(value, props.get("foo"));
+
         } else {
             SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
             assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
 
+            Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName);
+            assertEquals(value, props.get("foo"));
+
             SubscriptionStats subscriptionStats2 = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName2);
             assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty());
+
+            Map<String, String> props2 = admin.topics().getSubscriptionProperties(topic, subscriptionName2);
+            assertTrue(props2.isEmpty());
         }
 
         // clear the properties on subscriptionName
@@ -183,6 +198,9 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest {
                 SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
                         .getSubscriptions().get(subscriptionName);
                 assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());
+
+                Map<String, String> props = admin.topics().getSubscriptionProperties(topic + "-partition-" + i, subscriptionName);
+                assertTrue(props.isEmpty());
             }
 
             // aggregated properties
@@ -190,9 +208,15 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest {
                     .getSubscriptions().get(subscriptionName);
             assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());
 
+            Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName);
+            assertTrue(props.isEmpty());
+
         } else {
             SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
             assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());
+
+            Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName);
+            assertTrue(props.isEmpty());
         }
 
         // update the properties on subscriptionName
@@ -204,6 +228,9 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest {
                 SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
                         .getSubscriptions().get(subscriptionName);
                 assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
+
+                Map<String, String> props = admin.topics().getSubscriptionProperties(topic + "-partition-" + i, subscriptionName);
+                assertEquals(value, props.get("foo"));
             }
 
             // aggregated properties
@@ -211,12 +238,21 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest {
                     .getSubscriptions().get(subscriptionName);
             assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
 
+            Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName);
+            assertEquals(value, props.get("foo"));
+
         } else {
             SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
             assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
 
+            Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName);
+            assertEquals(value, props.get("foo"));
+
             SubscriptionStats subscriptionStats2 = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName2);
             assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty());
+
+            Map<String, String> props2 = admin.topics().getSubscriptionProperties(topic, subscriptionName2);
+            assertTrue(props2.isEmpty());
         }
 
     }
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index bcacca00e59..a4486d846b1 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1839,6 +1839,15 @@ public interface Topics {
     void updateSubscriptionProperties(String topic, String subName, Map<String, String> subscriptionProperties)
             throws PulsarAdminException;
 
+    /**
+     * Get Subscription Properties on a topic subscription.
+     * @param topic
+     * @param subName
+     * @throws PulsarAdminException
+     */
+    Map<String, String> getSubscriptionProperties(String topic, String subName)
+            throws PulsarAdminException;
+
     /**
      * Reset cursor position on a topic subscription.
      *
@@ -1873,6 +1882,13 @@ public interface Topics {
     CompletableFuture<Void>  updateSubscriptionPropertiesAsync(String topic, String subName,
                                                                Map<String, String> subscriptionProperties);
 
+    /**
+     * Get Subscription Properties on a topic subscription.
+     * @param topic
+     * @param subName
+     */
+    CompletableFuture<Map<String, String>> getSubscriptionPropertiesAsync(String topic, String subName);
+
     /**
      * Reset cursor position on a topic subscription.
      *
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 938b85f1920..909b8f74ea5 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1213,6 +1213,12 @@ public class TopicsImpl extends BaseResource implements Topics {
         sync(() -> updateSubscriptionPropertiesAsync(topic, subName, subscriptionProperties));
     }
 
+    @Override
+    public Map<String, String> getSubscriptionProperties(String topic, String subName)
+            throws PulsarAdminException {
+        return sync(() -> getSubscriptionPropertiesAsync(topic, subName));
+    }
+
     @Override
     public CompletableFuture<Void> updateSubscriptionPropertiesAsync(String topic, String subName,
                                                                      Map<String, String> subscriptionProperties) {
@@ -1226,6 +1232,29 @@ public class TopicsImpl extends BaseResource implements Topics {
         return asyncPutRequest(path, Entity.entity(subscriptionProperties, MediaType.APPLICATION_JSON));
     }
 
+    @Override
+    public CompletableFuture<Map<String, String>> getSubscriptionPropertiesAsync(String topic, String subName) {
+        TopicName tn = validateTopic(topic);
+        String encodedSubName = Codec.encode(subName);
+        WebTarget path = topicPath(tn, "subscription", encodedSubName,
+                "properties");
+        final CompletableFuture<Map<String, String>> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Map<String, String>>() {
+
+                    @Override
+                    public void completed(Map<String, String> response) {
+                        future.complete(response);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
     @Override
     public void resetCursor(String topic, String subName, MessageId messageId
             , boolean isExcluded) throws PulsarAdminException {
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 324c18deb38..74f12f0762e 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1494,6 +1494,10 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("update-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1 --clear"));
         verify(mockTopics).updateSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1", new HashMap<>());
 
+        cmdTopics = new CmdTopics(() -> admin);
+        cmdTopics.run(split("get-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1"));
+        verify(mockTopics).getSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1");
+
         cmdTopics = new CmdTopics(() -> admin);
         props = new HashMap<>();
         props.put("a", "b");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index bff613f4ce3..23cffd4e3a2 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -73,6 +73,7 @@ import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.util.DateFormatter;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.RelativeTimeUtil;
 
 @Getter
@@ -98,6 +99,7 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("unsubscribe", new DeleteSubscription());
         jcommander.addCommand("create-subscription", new CreateSubscription());
         jcommander.addCommand("update-subscription-properties", new UpdateSubscriptionProperties());
+        jcommander.addCommand("get-subscription-properties", new GetSubscriptionProperties());
 
         jcommander.addCommand("stats", new GetStats());
         jcommander.addCommand("stats-internal", new GetInternalStats());
@@ -1006,6 +1008,24 @@ public class CmdTopics extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get the properties of a subscription on a topic")
+    private class GetSubscriptionProperties extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-s",
+                "--subscription" }, description = "Subscription to describe", required = true)
+        private String subscriptionName;
+
+        @Override
+        void run() throws Exception {
+            String topic = validateTopicName(params);
+            Map<String, String> result = getTopics().getSubscriptionProperties(topic, subscriptionName);
+            // Ensure we are using JSON and not Java toString()
+            System.out.println(ObjectMapperFactory.getThreadLocal().writeValueAsString(result));
+        }
+    }
+
 
     @Parameters(commandDescription = "Reset position for subscription to a position that is closest to "
             + "timestamp or messageId.")
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index 6c00314a7e5..a10c6ed3dd6 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -207,6 +207,18 @@ public class CLITest extends PulsarTestSuite {
             );
             resultUpdate.assertNoOutput();
 
+            ContainerExecResult resultGet = container.execCmd(
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "topics",
+                    "get-subscription-properties",
+                    "persistent://public/default/" + topic,
+                    "--subscription",
+                    "" + subscriptionPrefix + i
+            );
+            assertEquals(
+                    resultGet.getStdout().trim(), "{\"a\":\"e\"}",
+                    "unexpected output " + resultGet.getStdout() + " - error " + resultGet.getStderr());
+
             ContainerExecResult resultClear = container.execCmd(
                     PulsarCluster.ADMIN_SCRIPT,
                     "topics",
@@ -217,6 +229,20 @@ public class CLITest extends PulsarTestSuite {
                     "" + subscriptionPrefix + i
             );
             resultClear.assertNoOutput();
+
+            ContainerExecResult resultGetAfterClear = container.execCmd(
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "topics",
+                    "get-subscription-properties",
+                    "persistent://public/default/" + topic,
+                    "--subscription",
+                    "" + subscriptionPrefix + i
+            );
+            assertEquals(
+                    resultGetAfterClear.getStdout().trim(), "{}",
+                    "unexpected output " + resultGetAfterClear.getStdout()
+                            + " - error " + resultGetAfterClear.getStderr());
+
             i++;
         }
     }