You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/08 04:38:15 UTC
[pulsar] branch master updated: fix: function config
cleanupSubscription update bug (#3771)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 925879b fix: function config cleanupSubscription update bug (#3771)
925879b is described below
commit 925879be0691c4daf5cd49c55cfb2b6113e1d31c
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu Mar 7 20:38:10 2019 -0800
fix: function config cleanupSubscription update bug (#3771)
* fix: function config cleanupSubscription update bug
* add and fix unit tests
---
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 76 +++++++++++++++++++++-
.../functions/utils/FunctionConfigUtils.java | 4 ++
.../functions/utils/FunctionConfigUtilsTest.java | 28 +++++++-
3 files changed, 106 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index c8a95c1..253d2b6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -1207,10 +1207,18 @@ public class PulsarFunctionE2ETest {
functionConfig.setInputs(Collections.singleton(sourceTopic));
functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
functionConfig.setOutput(sinkTopic);
- functionConfig.setCleanupSubscription(true);
+ functionConfig.setCleanupSubscription(false);
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+ retryStrategically((test) -> {
+ try {
+ return admin.functions().getFunction(tenant, namespacePortion, functionName).getCleanupSubscription();
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+ assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).getCleanupSubscription());
retryStrategically((test) -> {
try {
@@ -1222,6 +1230,19 @@ public class PulsarFunctionE2ETest {
// validate pulsar source consumer has started on the topic
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
+ // test update cleanup subscription
+ functionConfig.setCleanupSubscription(true);
+ admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
+
+ retryStrategically((test) -> {
+ try {
+ return admin.functions().getFunction(tenant, namespacePortion, functionName).getCleanupSubscription();
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+ assertTrue(admin.functions().getFunction(tenant, namespacePortion, functionName).getCleanupSubscription());
+
int totalMsgs = 10;
for (int i = 0; i < totalMsgs; i++) {
String data = "my-message-" + i;
@@ -1266,6 +1287,59 @@ public class PulsarFunctionE2ETest {
// make sure subscriptions are cleanup
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
+
+
+ /** test do not cleanup subscription **/
+ functionConfig.setCleanupSubscription(false);
+ admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+
+ retryStrategically((test) -> {
+ try {
+ return admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+ // validate pulsar source consumer has started on the topic
+ assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
+
+ retryStrategically((test) -> {
+ try {
+ FunctionConfig result = admin.functions().getFunction(tenant, namespacePortion, functionName);
+ return result.getParallelism() == 2 && result.getCleanupSubscription() == false;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+ assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).getCleanupSubscription());
+
+ // test update another config and making sure that subscription cleanup remains unchanged
+ functionConfig.setParallelism(2);
+ admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
+
+ retryStrategically((test) -> {
+ try {
+ FunctionConfig result = admin.functions().getFunction(tenant, namespacePortion, functionName);
+ return result.getParallelism() == 2 && result.getCleanupSubscription() == false;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+ assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).getCleanupSubscription());
+
+ // delete functions
+ admin.functions().deleteFunction(tenant, namespacePortion, functionName);
+
+ retryStrategically((test) -> {
+ try {
+ return admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+
+ // make sure subscriptions are cleanup
+ assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 1e6a85c..7932f58 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -257,6 +257,7 @@ public class FunctionConfigUtils {
functionConfig.setRetainOrdering(false);
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
}
+ functionConfig.setCleanupSubscription(functionDetails.getSource().getCleanupSubscription());
functionConfig.setAutoAck(functionDetails.getAutoAck());
if (functionDetails.getSource().getTimeoutMs() != 0) {
functionConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
@@ -694,6 +695,9 @@ public class FunctionConfigUtils {
if (newConfig.getTimeoutMs() != null) {
mergedConfig.setTimeoutMs(newConfig.getTimeoutMs());
}
+ if (newConfig.getCleanupSubscription() != null) {
+ mergedConfig.setCleanupSubscription(newConfig.getCleanupSubscription());
+ }
return mergedConfig;
}
}
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index 0f3a5a3..68dddc2 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -35,6 +35,8 @@ import java.util.Map;
import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
import static org.apache.pulsar.common.functions.FunctionConfig.Runtime.PYTHON;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
/**
* Unit test of {@link Reflections}.
@@ -65,6 +67,8 @@ public class FunctionConfigUtilsTest {
// add default resources
functionConfig.setResources(Resources.getDefaultResources());
+ // set default cleanupSubscription config
+ functionConfig.setCleanupSubscription(true);
assertEquals(
new Gson().toJson(functionConfig),
new Gson().toJson(convertedConfig)
@@ -96,6 +100,8 @@ public class FunctionConfigUtilsTest {
// add default resources
functionConfig.setResources(Resources.getDefaultResources());
+ // set default cleanupSubscription config
+ functionConfig.setCleanupSubscription(true);
assertEquals(
new Gson().toJson(functionConfig),
new Gson().toJson(convertedConfig)
@@ -199,6 +205,22 @@ public class FunctionConfigUtilsTest {
);
}
+ @Test
+ public void testMergeCleanupSubscription() {
+ FunctionConfig functionConfig = createFunctionConfig();
+ FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("cleanupSubscription", true);
+ FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+ assertTrue(mergedConfig.getCleanupSubscription());
+
+ newFunctionConfig = createUpdatedFunctionConfig("cleanupSubscription", false);
+ mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+ assertFalse(mergedConfig.getCleanupSubscription());
+
+ newFunctionConfig = createUpdatedFunctionConfig("cleanupSubscription", true);
+ mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+ assertTrue(mergedConfig.getCleanupSubscription());
+ }
+
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted")
public void testMergeDifferentProcessingGuarantees() {
FunctionConfig functionConfig = createFunctionConfig();
@@ -392,6 +414,7 @@ public class FunctionConfigUtilsTest {
functionConfig.setAutoAck(true);
functionConfig.setTimeoutMs(2000l);
functionConfig.setWindowConfig(new WindowConfig().setWindowLengthCount(10));
+ functionConfig.setCleanupSubscription(true);
return functionConfig;
}
@@ -425,7 +448,9 @@ public class FunctionConfigUtilsTest {
.setSchemaType(JSONSchema.class.getName()).build());
Function.SourceSpec sourceSpec = Function.SourceSpec.newBuilder()
.putAllInputSpecs(consumerSpecMap)
- .setSubscriptionType(Function.SubscriptionType.FAILOVER).build();
+ .setSubscriptionType(Function.SubscriptionType.FAILOVER)
+ .setCleanupSubscription(true)
+ .build();
boolean autoAck = true;
String logTopic = "log-topic1";
Function.Resources resources = Function.Resources.newBuilder().setCpu(1.5).setDisk(1024 * 20).setRam(1024 * 10).build();
@@ -466,5 +491,6 @@ public class FunctionConfigUtilsTest {
assertEquals(functionConfig.getResources().getRam().longValue(), resources.getRam());
assertEquals(functionConfig.getOutput(), sinkSpec.getTopic());
assertEquals(functionConfig.getInputSpecs().keySet(), sourceSpec.getInputSpecsMap().keySet());
+ assertEquals(functionConfig.getCleanupSubscription().booleanValue(), sourceSpec.getCleanupSubscription());
}
}