You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/03 11:44:26 UTC

[pulsar] branch master updated: PIP-105 add support for updating the Subscription properties (#15751)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e77e88cc54 PIP-105 add support for updating the Subscription properties (#15751)
8e77e88cc54 is described below

commit 8e77e88cc54c05fa4f8b360be499c3da61607a66
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Fri Jun 3 13:44:16 2022 +0200

    PIP-105 add support for updating the Subscription properties (#15751)
    
    * PIP-105 add support for updating the Subscription properties
    
    * Implement command update-subscription-properties
    
    * Add tests
    
    * Add volatile
    
    * Fix PersistentTopicTest
    
    * PIP-105: Store Subscription properties
    
    * Fix FilterEntryTest
    
    * Add volatile
    
    * Fix PersistentTopicTest
    
    * fix ServerCnxTest test
    
    * Switch from POST to PUT
    
    * rename to /properties
    
    * Apply suggestions from code review
    
    Co-authored-by: Lari Hotari <lh...@users.noreply.github.com>
    
    Co-authored-by: Lari Hotari <lh...@users.noreply.github.com>
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 106 +++++++++++++++++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  37 +++++++
 .../broker/admin/AdminApiSubscriptionTest.java     |  45 +++++++++
 .../org/apache/pulsar/client/admin/Topics.java     |  21 ++++
 .../pulsar/client/admin/internal/TopicsImpl.java   |  19 ++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  11 +++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  37 +++++++
 .../pulsar/tests/integration/cli/CLITest.java      |  25 ++++-
 8 files changed, 300 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 2f3312c4ae2..7603e0c2e43 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1560,6 +1560,33 @@ public class PersistentTopicsBase extends AdminResource {
             });
     }
 
+    private void internalUpdateSubscriptionPropertiesForNonPartitionedTopic(AsyncResponse asyncResponse,
+                                      String subName, Map<String, String> subscriptionProperties,
+                                      boolean authoritative) {
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME))
+                .thenCompose(__ -> {
+                    Topic topic = getTopicReference(topicName);
+                    Subscription sub = topic.getSubscription(subName);
+                    if (sub == null) {
+                        throw new RestException(Status.NOT_FOUND,
+                                getSubNotFoundErrorMessage(topicName.toString(), subName));
+                    }
+                    return sub.updateSubscriptionProperties(subscriptionProperties);
+                }).thenRun(() -> {
+                    log.info("[{}][{}] Updated subscription {}", clientAppId(), topicName, subName);
+                    asyncResponse.resume(Response.noContent().build());
+                }).exceptionally(ex -> {
+                    Throwable cause = ex.getCause();
+                    // If the exception is not redirect exception we need to log it.
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to update subscription {} {}", clientAppId(), topicName, subName, cause);
+                    }
+                    asyncResponse.resume(new RestException(cause));
+                    return null;
+                });
+    }
+
     protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse,
                                                         String subName, boolean authoritative) {
         CompletableFuture<Void> future;
@@ -2289,6 +2316,85 @@ public class PersistentTopicsBase extends AdminResource {
         });
     }
 
+    protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse, String subName,
+                                                        Map<String, String> subscriptionProperties,
+                                                        boolean authoritative) {
+        CompletableFuture<Void> future;
+        if (topicName.isGlobal()) {
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            future = CompletableFuture.completedFuture(null);
+        }
+
+        future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> {
+            if (topicName.isPartitioned()) {
+                internalUpdateSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
+                        subscriptionProperties, authoritative);
+            } else {
+                getPartitionedTopicMetadataAsync(topicName,
+                        authoritative, false).thenAcceptAsync(partitionMetadata -> {
+                    if (partitionMetadata.partitions > 0) {
+                        final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+
+                        for (int i = 0; i < partitionMetadata.partitions; i++) {
+                            TopicName topicNamePartition = topicName.getPartition(i);
+                            try {
+                                futures.add(pulsar().getAdminClient().topics()
+                                        .updateSubscriptionPropertiesAsync(topicNamePartition.toString(),
+                                                subName, subscriptionProperties));
+                            } catch (Exception e) {
+                                log.error("[{}] Failed to update properties for subscription {} {}",
+                                        clientAppId(), topicNamePartition, subName,
+                                        e);
+                                asyncResponse.resume(new RestException(e));
+                                return;
+                            }
+                        }
+
+                        FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                            if (exception != null) {
+                                Throwable t = exception.getCause();
+                                if (t instanceof NotFoundException) {
+                                    asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                            getSubNotFoundErrorMessage(topicName.toString(), subName)));
+                                    return null;
+                                } else if (t instanceof PreconditionFailedException) {
+                                    asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
+                                            "Subscription has active connected consumers"));
+                                    return null;
+                                } else {
+                                    log.error("[{}] Failed to update properties for subscription {} {}",
+                                            clientAppId(), topicName, subName, t);
+                                    asyncResponse.resume(new RestException(t));
+                                    return null;
+                                }
+                            }
+
+                            asyncResponse.resume(Response.noContent().build());
+                            return null;
+                        });
+                    } else {
+                        internalUpdateSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
+                                subscriptionProperties, authoritative);
+                    }
+                }, pulsar().getExecutor()).exceptionally(ex -> {
+                    log.error("[{}] Failed to update properties for subscription {} from topic {}",
+                            clientAppId(), subName, topicName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+            }
+        }).exceptionally(ex -> {
+            // If the exception is not redirect exception we need to log it.
+            if (!isRedirectException(ex)) {
+                log.error("[{}] Failed to update subscription {} from topic {}",
+                        clientAppId(), subName, topicName, ex);
+            }
+            resumeAsyncResponseExceptionally(asyncResponse, ex);
+            return null;
+        });
+    }
+
     protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String subName, boolean authoritative,
             MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
         CompletableFuture<Void> ret;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 4cd405d3e9a..3261e84b505 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1527,6 +1527,43 @@ public class PersistentTopics extends PersistentTopicsBase {
         }
     }
 
+    @PUT
+    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/properties")
+    @ApiOperation(value = "Replaces all the properties on the given subscription")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+                    + "subscriber is not authorized to access this operation"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
+            @ApiResponse(code = 405, message = "Method Not Allowed"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
+    })
+    public void updateSubscriptionProperties(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Subscription to update", required = true)
+            @PathParam("subName") String encodedSubName,
+            @ApiParam(value = "The new properties") Map<String, String> subscriptionProperties,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalUpdateSubscriptionProperties(asyncResponse, decode(encodedSubName),
+                    subscriptionProperties, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
     @POST
     @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/resetcursor")
     @ApiOperation(value = "Reset subscription to message position closest to given position.",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
index 2c37235ebdb..67416b4288e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
@@ -174,5 +174,50 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest {
             assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty());
         }
 
+        // clear the properties on subscriptionName
+        admin.topics().updateSubscriptionProperties(topic, subscriptionName, new HashMap<>());
+
+        if (partitioned) {
+            PartitionedTopicMetadata partitionedTopicMetadata = admin.topics().getPartitionedTopicMetadata(topic);
+            for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
+                SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
+                        .getSubscriptions().get(subscriptionName);
+                assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());
+            }
+
+            // aggregated properties
+            SubscriptionStats subscriptionStats = admin.topics().getPartitionedStats(topic, false)
+                    .getSubscriptions().get(subscriptionName);
+            assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());
+
+        } else {
+            SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
+            assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());
+        }
+
+        // update the properties on subscriptionName
+        admin.topics().updateSubscriptionProperties(topic, subscriptionName, properties);
+
+        if (partitioned) {
+            PartitionedTopicMetadata partitionedTopicMetadata = admin.topics().getPartitionedTopicMetadata(topic);
+            for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
+                SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
+                        .getSubscriptions().get(subscriptionName);
+                assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
+            }
+
+            // aggregated properties
+            SubscriptionStats subscriptionStats = admin.topics().getPartitionedStats(topic, false)
+                    .getSubscriptions().get(subscriptionName);
+            assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
+
+        } else {
+            SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
+            assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
+
+            SubscriptionStats subscriptionStats2 = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName2);
+            assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty());
+        }
+
     }
 }
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 48ef03cd67e..d07c67e11ac 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1812,6 +1812,17 @@ public interface Topics {
      */
     void resetCursor(String topic, String subName, MessageId messageId, boolean isExcluded) throws PulsarAdminException;
 
+    /**
+     * Update Subscription Properties on a topic subscription.
+     * The new properties will override the existing values, properties that are not passed will be removed.
+     * @param topic
+     * @param subName
+     * @param subscriptionProperties
+     * @throws PulsarAdminException
+     */
+    void updateSubscriptionProperties(String topic, String subName, Map<String, String> subscriptionProperties)
+            throws PulsarAdminException;
+
     /**
      * Reset cursor position on a topic subscription.
      *
@@ -1836,6 +1847,16 @@ public interface Topics {
      */
     CompletableFuture<Void> resetCursorAsync(String topic, String subName, MessageId messageId, boolean isExcluded);
 
+    /**
+     * Update Subscription Properties on a topic subscription.
+     * The new properties will override the existing values, properties that are not passed will be removed.
+     * @param topic
+     * @param subName
+     * @param subscriptionProperties
+     */
+    CompletableFuture<Void>  updateSubscriptionPropertiesAsync(String topic, String subName,
+                                                               Map<String, String> subscriptionProperties);
+
     /**
      * Reset cursor position on a topic subscription.
      *
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 5314872fc00..d3682a15317 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1181,6 +1181,25 @@ public class TopicsImpl extends BaseResource implements Topics {
         sync(() -> resetCursorAsync(topic, subName, messageId));
     }
 
+    @Override
+    public void updateSubscriptionProperties(String topic, String subName, Map<String, String> subscriptionProperties)
+            throws PulsarAdminException {
+        sync(() -> updateSubscriptionPropertiesAsync(topic, subName, subscriptionProperties));
+    }
+
+    @Override
+    public CompletableFuture<Void> updateSubscriptionPropertiesAsync(String topic, String subName,
+                                                                     Map<String, String> subscriptionProperties) {
+        TopicName tn = validateTopic(topic);
+        String encodedSubName = Codec.encode(subName);
+        WebTarget path = topicPath(tn, "subscription", encodedSubName,
+                "properties");
+        if (subscriptionProperties == null) {
+            subscriptionProperties = new HashMap<>();
+        }
+        return asyncPutRequest(path, Entity.entity(subscriptionProperties, MediaType.APPLICATION_JSON));
+    }
+
     @Override
     public void resetCursor(String topic, String subName, MessageId messageId
             , boolean isExcluded) throws PulsarAdminException {
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 2b46d2ab760..324c18deb38 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1490,6 +1490,17 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest -r"));
         verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, true, null);
 
+        cmdTopics = new CmdTopics(() -> admin);
+        cmdTopics.run(split("update-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1 --clear"));
+        verify(mockTopics).updateSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1", new HashMap<>());
+
+        cmdTopics = new CmdTopics(() -> admin);
+        props = new HashMap<>();
+        props.put("a", "b");
+        props.put("c", "d");
+        cmdTopics.run(split("update-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1 -p a=b -p c=d"));
+        verify(mockTopics).updateSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1", props);
+
         cmdTopics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions 32"));
         verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32, null);
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 0d27d74b86a..fb78679c5f0 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -33,6 +33,7 @@ import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -96,6 +97,7 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("subscriptions", new ListSubscriptions());
         jcommander.addCommand("unsubscribe", new DeleteSubscription());
         jcommander.addCommand("create-subscription", new CreateSubscription());
+        jcommander.addCommand("update-subscription-properties", new UpdateSubscriptionProperties());
 
         jcommander.addCommand("stats", new GetStats());
         jcommander.addCommand("stats-internal", new GetInternalStats());
@@ -956,6 +958,41 @@ public class CmdTopics extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Update the properties of a subscription on a topic")
+    private class UpdateSubscriptionProperties extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-s",
+                "--subscription" }, description = "Subscription to update", required = true)
+        private String subscriptionName;
+
+        @Parameter(names = {"--property", "-p"}, description = "key value pair properties(-p a=b -p c=d)",
+                required = false)
+        private java.util.List<String> properties;
+
+        @Parameter(names = {"--clear", "-c"}, description = "Remove all properties",
+                required = false)
+        private boolean clear;
+
+        @Override
+        void run() throws Exception {
+            String topic = validateTopicName(params);
+            Map<String, String> map = parseListKeyValueMap(properties);
+            if (map == null) {
+                map = Collections.emptyMap();
+            }
+            if ((map.isEmpty()) && !clear) {
+                throw new ParameterException("If you want to clear the properties you have to use --clear");
+            }
+            if (clear && !map.isEmpty()) {
+                throw new ParameterException("If you set --clear then you should not pass any properties");
+            }
+            getTopics().updateSubscriptionProperties(topic, subscriptionName, map);
+        }
+    }
+
+
     @Parameters(commandDescription = "Reset position for subscription to a position that is closest to "
             + "timestamp or messageId.")
     private class ResetCursor extends CliCommand {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index 66baadf590c..6c00314a7e5 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -174,7 +174,7 @@ public class CLITest extends PulsarTestSuite {
     }
 
     @Test
-    public void testCreateSubscriptionWithPropertiesCommand() throws Exception {
+    public void testCreateUpdateSubscriptionWithPropertiesCommand() throws Exception {
         String topic = "testCreateSubscriptionCommmand";
 
         String subscriptionPrefix = "subscription-";
@@ -194,6 +194,29 @@ public class CLITest extends PulsarTestSuite {
                     "" + subscriptionPrefix + i
             );
             result.assertNoOutput();
+
+            ContainerExecResult resultUpdate = container.execCmd(
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "topics",
+                    "update-subscription-properties",
+                    "-p",
+                    "a=e",
+                    "persistent://public/default/" + topic,
+                    "--subscription",
+                    "" + subscriptionPrefix + i
+            );
+            resultUpdate.assertNoOutput();
+
+            ContainerExecResult resultClear = container.execCmd(
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "topics",
+                    "update-subscription-properties",
+                    "-c",
+                    "persistent://public/default/" + topic,
+                    "--subscription",
+                    "" + subscriptionPrefix + i
+            );
+            resultClear.assertNoOutput();
             i++;
         }
     }