You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ti...@apache.org on 2023/04/23 08:15:47 UTC
[pulsar] branch master updated: [improve][cli] Add `--cleanupSubscription` to pulsar-admin (#20028)
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 795d60acc7a [improve][cli] Add `--cleanupSubscription` to pulsar-admin (#20028)
795d60acc7a is described below
commit 795d60acc7a27a2d6bddfc60ffd7c9cf52a91cb4
Author: jiangpengcheng <sc...@gmail.com>
AuthorDate: Sun Apr 23 16:15:35 2023 +0800
[improve][cli] Add `--cleanupSubscription` to pulsar-admin (#20028)
Co-authored-by: tison <wa...@gmail.com>
---
.../main/java/org/apache/pulsar/admin/cli/CmdFunctions.java | 7 +++++++
.../src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java | 8 ++++++++
.../org/apache/pulsar/functions/utils/SinkConfigUtils.java | 10 ----------
3 files changed, 15 insertions(+), 10 deletions(-)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 05bab9c6f19..91ec7183ff1 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -201,6 +201,9 @@ public class CmdFunctions extends CmdBase {
protected String className;
@Parameter(names = { "-t", "--function-type" }, description = "The built-in Pulsar Function type")
protected String functionType;
+ @Parameter(names = "--cleanup-subscription", description = "Whether delete the subscription "
+ + "when function is deleted")
+ protected Boolean cleanupSubscription;
@Parameter(names = "--jar", description = "Path to the JAR file for the function "
+ "(if the function is written in Java). It also supports URL path [http/https/file "
+ "(file protocol assumes that file already exists on worker host)/function "
@@ -471,6 +474,10 @@ public class CmdFunctions extends CmdBase {
}
}
+ if (null != cleanupSubscription) {
+ functionConfig.setCleanupSubscription(cleanupSubscription);
+ }
+
if (null != inputs) {
List<String> inputTopics = Arrays.asList(inputs.split(","));
functionConfig.setInputs(inputTopics);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 4af9221dd2e..0b27dd8d0a7 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -291,6 +291,10 @@ public class CmdSinks extends CmdBase {
@Parameter(names = { "-t", "--sink-type" }, description = "The sinks's connector provider")
protected String sinkType;
+ @Parameter(names = "--cleanup-subscription", description = "Whether delete the subscription "
+ + "when sink is deleted")
+ protected Boolean cleanupSubscription;
+
@Parameter(names = { "-i",
"--inputs" }, description = "The sink's input topic or topics "
+ "(multiple topics can be specified as a comma-separated list)")
@@ -469,6 +473,10 @@ public class CmdSinks extends CmdBase {
sinkConfig.setProcessingGuarantees(processingGuarantees);
}
+ if (null != cleanupSubscription) {
+ sinkConfig.setCleanupSubscription(cleanupSubscription);
+ }
+
if (retainOrdering != null) {
sinkConfig.setRetainOrdering(retainOrdering);
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index d79f787588c..1a4009f5295 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -202,12 +202,6 @@ public class SinkConfigUtils {
sourceSpecBuilder.setNegativeAckRedeliveryDelayMs(sinkConfig.getNegativeAckRedeliveryDelayMs());
}
- if (sinkConfig.getCleanupSubscription() != null) {
- sourceSpecBuilder.setCleanupSubscription(sinkConfig.getCleanupSubscription());
- } else {
- sourceSpecBuilder.setCleanupSubscription(true);
- }
-
if (sinkConfig.getSourceSubscriptionPosition() == SubscriptionInitialPosition.Earliest) {
sourceSpecBuilder.setSubscriptionPosition(Function.SubscriptionPosition.EARLIEST);
} else {
@@ -329,7 +323,6 @@ public class SinkConfigUtils {
// Set subscription position
sinkConfig.setSourceSubscriptionPosition(
convertFromFunctionDetailsSubscriptionPosition(functionDetails.getSource().getSubscriptionPosition()));
- sinkConfig.setCleanupSubscription(functionDetails.getSource().getCleanupSubscription());
if (functionDetails.getSource().getTimeoutMs() != 0) {
sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
@@ -671,9 +664,6 @@ public class SinkConfigUtils {
if (!StringUtils.isEmpty(newConfig.getCustomRuntimeOptions())) {
mergedConfig.setCustomRuntimeOptions(newConfig.getCustomRuntimeOptions());
}
- if (newConfig.getCleanupSubscription() != null) {
- mergedConfig.setCleanupSubscription(newConfig.getCleanupSubscription());
- }
if (newConfig.getTransformFunction() != null) {
mergedConfig.setTransformFunction(newConfig.getTransformFunction());
}