You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ti...@apache.org on 2023/04/23 08:15:47 UTC

[pulsar] branch master updated: [improve][cli] Add `--cleanupSubscription` to pulsar-admin (#20028)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 795d60acc7a [improve][cli] Add `--cleanupSubscription` to pulsar-admin (#20028)
795d60acc7a is described below

commit 795d60acc7a27a2d6bddfc60ffd7c9cf52a91cb4
Author: jiangpengcheng <sc...@gmail.com>
AuthorDate: Sun Apr 23 16:15:35 2023 +0800

    [improve][cli] Add `--cleanupSubscription` to pulsar-admin (#20028)
    
    Co-authored-by: tison <wa...@gmail.com>
---
 .../main/java/org/apache/pulsar/admin/cli/CmdFunctions.java    |  7 +++++++
 .../src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java    |  8 ++++++++
 .../org/apache/pulsar/functions/utils/SinkConfigUtils.java     | 10 ----------
 3 files changed, 15 insertions(+), 10 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 05bab9c6f19..91ec7183ff1 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -201,6 +201,9 @@ public class CmdFunctions extends CmdBase {
         protected String className;
         @Parameter(names = { "-t", "--function-type" }, description = "The built-in Pulsar Function type")
         protected String functionType;
+        @Parameter(names = "--cleanup-subscription", description = "Whether delete the subscription "
+                + "when function is deleted")
+        protected Boolean cleanupSubscription;
         @Parameter(names = "--jar", description = "Path to the JAR file for the function "
                 + "(if the function is written in Java). It also supports URL path [http/https/file "
                 + "(file protocol assumes that file already exists on worker host)/function "
@@ -471,6 +474,10 @@ public class CmdFunctions extends CmdBase {
                 }
             }
 
+            if (null != cleanupSubscription) {
+                functionConfig.setCleanupSubscription(cleanupSubscription);
+            }
+
             if (null != inputs) {
                 List<String> inputTopics = Arrays.asList(inputs.split(","));
                 functionConfig.setInputs(inputTopics);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 4af9221dd2e..0b27dd8d0a7 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -291,6 +291,10 @@ public class CmdSinks extends CmdBase {
         @Parameter(names = { "-t", "--sink-type" }, description = "The sinks's connector provider")
         protected String sinkType;
 
+        @Parameter(names = "--cleanup-subscription", description = "Whether delete the subscription "
+                + "when sink is deleted")
+        protected Boolean cleanupSubscription;
+
         @Parameter(names = { "-i",
                 "--inputs" }, description = "The sink's input topic or topics "
                 + "(multiple topics can be specified as a comma-separated list)")
@@ -469,6 +473,10 @@ public class CmdSinks extends CmdBase {
                 sinkConfig.setProcessingGuarantees(processingGuarantees);
             }
 
+            if (null != cleanupSubscription) {
+                sinkConfig.setCleanupSubscription(cleanupSubscription);
+            }
+
             if (retainOrdering != null) {
                 sinkConfig.setRetainOrdering(retainOrdering);
             }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index d79f787588c..1a4009f5295 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -202,12 +202,6 @@ public class SinkConfigUtils {
             sourceSpecBuilder.setNegativeAckRedeliveryDelayMs(sinkConfig.getNegativeAckRedeliveryDelayMs());
         }
 
-        if (sinkConfig.getCleanupSubscription() != null) {
-            sourceSpecBuilder.setCleanupSubscription(sinkConfig.getCleanupSubscription());
-        } else {
-            sourceSpecBuilder.setCleanupSubscription(true);
-        }
-
         if (sinkConfig.getSourceSubscriptionPosition() == SubscriptionInitialPosition.Earliest) {
             sourceSpecBuilder.setSubscriptionPosition(Function.SubscriptionPosition.EARLIEST);
         } else {
@@ -329,7 +323,6 @@ public class SinkConfigUtils {
         // Set subscription position
         sinkConfig.setSourceSubscriptionPosition(
                 convertFromFunctionDetailsSubscriptionPosition(functionDetails.getSource().getSubscriptionPosition()));
-        sinkConfig.setCleanupSubscription(functionDetails.getSource().getCleanupSubscription());
 
         if (functionDetails.getSource().getTimeoutMs() != 0) {
             sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
@@ -671,9 +664,6 @@ public class SinkConfigUtils {
         if (!StringUtils.isEmpty(newConfig.getCustomRuntimeOptions())) {
             mergedConfig.setCustomRuntimeOptions(newConfig.getCustomRuntimeOptions());
         }
-        if (newConfig.getCleanupSubscription() != null) {
-            mergedConfig.setCleanupSubscription(newConfig.getCleanupSubscription());
-        }
         if (newConfig.getTransformFunction() != null) {
             mergedConfig.setTransformFunction(newConfig.getTransformFunction());
         }