You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/08 04:38:15 UTC

[pulsar] branch master updated: fix: function config cleanupSubscription update bug (#3771)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 925879b  fix: function config cleanupSubscription update bug (#3771)
925879b is described below

commit 925879be0691c4daf5cd49c55cfb2b6113e1d31c
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu Mar 7 20:38:10 2019 -0800

    fix: function config cleanupSubscription update bug (#3771)
    
    * fix: function config cleanupSubscription update bug
    
    * add and fix unit tests
---
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    | 76 +++++++++++++++++++++-
 .../functions/utils/FunctionConfigUtils.java       |  4 ++
 .../functions/utils/FunctionConfigUtilsTest.java   | 28 +++++++-
 3 files changed, 106 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index c8a95c1..253d2b6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -1207,10 +1207,18 @@ public class PulsarFunctionE2ETest {
         functionConfig.setInputs(Collections.singleton(sourceTopic));
         functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
         functionConfig.setOutput(sinkTopic);
-        functionConfig.setCleanupSubscription(true);
+        functionConfig.setCleanupSubscription(false);
         functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
 
         admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+        retryStrategically((test) -> {
+            try {
+                return admin.functions().getFunction(tenant, namespacePortion, functionName).getCleanupSubscription();
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+        assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).getCleanupSubscription());
 
         retryStrategically((test) -> {
             try {
@@ -1222,6 +1230,19 @@ public class PulsarFunctionE2ETest {
         // validate pulsar source consumer has started on the topic
         assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
 
+        // test update cleanup subscription
+        functionConfig.setCleanupSubscription(true);
+        admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
+
+        retryStrategically((test) -> {
+            try {
+                return admin.functions().getFunction(tenant, namespacePortion, functionName).getCleanupSubscription();
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+        assertTrue(admin.functions().getFunction(tenant, namespacePortion, functionName).getCleanupSubscription());
+
         int totalMsgs = 10;
         for (int i = 0; i < totalMsgs; i++) {
             String data = "my-message-" + i;
@@ -1266,6 +1287,59 @@ public class PulsarFunctionE2ETest {
 
         // make sure subscriptions are cleanup
         assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
+
+
+        /** test do not cleanup subscription **/
+        functionConfig.setCleanupSubscription(false);
+        admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+
+        retryStrategically((test) -> {
+            try {
+                return admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+        // validate pulsar source consumer has started on the topic
+        assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
+
+        retryStrategically((test) -> {
+            try {
+                FunctionConfig result = admin.functions().getFunction(tenant, namespacePortion, functionName);
+                return result.getParallelism() == 2 && result.getCleanupSubscription() == false;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+        assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).getCleanupSubscription());
+
+        // test update another config and making sure that subscription cleanup remains unchanged
+        functionConfig.setParallelism(2);
+        admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
+
+        retryStrategically((test) -> {
+            try {
+                FunctionConfig result = admin.functions().getFunction(tenant, namespacePortion, functionName);
+                return result.getParallelism() == 2 && result.getCleanupSubscription() == false;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+        assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).getCleanupSubscription());
+
+        // delete functions
+        admin.functions().deleteFunction(tenant, namespacePortion, functionName);
+
+        retryStrategically((test) -> {
+            try {
+                return admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+
+        // make sure subscriptions are cleanup
+        assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
     }
 
 
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 1e6a85c..7932f58 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -257,6 +257,7 @@ public class FunctionConfigUtils {
             functionConfig.setRetainOrdering(false);
             functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
         }
+        functionConfig.setCleanupSubscription(functionDetails.getSource().getCleanupSubscription());
         functionConfig.setAutoAck(functionDetails.getAutoAck());
         if (functionDetails.getSource().getTimeoutMs() != 0) {
             functionConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
@@ -694,6 +695,9 @@ public class FunctionConfigUtils {
         if (newConfig.getTimeoutMs() != null) {
             mergedConfig.setTimeoutMs(newConfig.getTimeoutMs());
         }
+        if (newConfig.getCleanupSubscription() != null) {
+            mergedConfig.setCleanupSubscription(newConfig.getCleanupSubscription());
+        }
         return mergedConfig;
     }
 }
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index 0f3a5a3..68dddc2 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -35,6 +35,8 @@ import java.util.Map;
 import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
 import static org.apache.pulsar.common.functions.FunctionConfig.Runtime.PYTHON;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
 
 /**
  * Unit test of {@link Reflections}.
@@ -65,6 +67,8 @@ public class FunctionConfigUtilsTest {
 
         // add default resources
         functionConfig.setResources(Resources.getDefaultResources());
+        // set default cleanupSubscription config
+        functionConfig.setCleanupSubscription(true);
         assertEquals(
                 new Gson().toJson(functionConfig),
                 new Gson().toJson(convertedConfig)
@@ -96,6 +100,8 @@ public class FunctionConfigUtilsTest {
 
         // add default resources
         functionConfig.setResources(Resources.getDefaultResources());
+        // set default cleanupSubscription config
+        functionConfig.setCleanupSubscription(true);
         assertEquals(
                 new Gson().toJson(functionConfig),
                 new Gson().toJson(convertedConfig)
@@ -199,6 +205,22 @@ public class FunctionConfigUtilsTest {
         );
     }
 
+    @Test
+    public void testMergeCleanupSubscription() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("cleanupSubscription", true);
+        FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertTrue(mergedConfig.getCleanupSubscription());
+
+        newFunctionConfig = createUpdatedFunctionConfig("cleanupSubscription", false);
+        mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertFalse(mergedConfig.getCleanupSubscription());
+
+        newFunctionConfig = createUpdatedFunctionConfig("cleanupSubscription", true);
+        mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertTrue(mergedConfig.getCleanupSubscription());
+    }
+
     @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted")
     public void testMergeDifferentProcessingGuarantees() {
         FunctionConfig functionConfig = createFunctionConfig();
@@ -392,6 +414,7 @@ public class FunctionConfigUtilsTest {
         functionConfig.setAutoAck(true);
         functionConfig.setTimeoutMs(2000l);
         functionConfig.setWindowConfig(new WindowConfig().setWindowLengthCount(10));
+        functionConfig.setCleanupSubscription(true);
         return functionConfig;
     }
 
@@ -425,7 +448,9 @@ public class FunctionConfigUtilsTest {
                 .setSchemaType(JSONSchema.class.getName()).build());
         Function.SourceSpec sourceSpec = Function.SourceSpec.newBuilder()
                 .putAllInputSpecs(consumerSpecMap)
-                .setSubscriptionType(Function.SubscriptionType.FAILOVER).build();
+                .setSubscriptionType(Function.SubscriptionType.FAILOVER)
+                .setCleanupSubscription(true)
+                .build();
         boolean autoAck = true;
         String logTopic = "log-topic1";
         Function.Resources resources = Function.Resources.newBuilder().setCpu(1.5).setDisk(1024 * 20).setRam(1024 * 10).build();
@@ -466,5 +491,6 @@ public class FunctionConfigUtilsTest {
         assertEquals(functionConfig.getResources().getRam().longValue(), resources.getRam());
         assertEquals(functionConfig.getOutput(), sinkSpec.getTopic());
         assertEquals(functionConfig.getInputSpecs().keySet(), sourceSpec.getInputSpecsMap().keySet());
+        assertEquals(functionConfig.getCleanupSubscription().booleanValue(), sourceSpec.getCleanupSubscription());
     }
 }