You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2019/03/07 17:25:43 UTC

[pulsar] branch master updated: Allow users to update everything in inputspecs except for isregexpattern (#3770)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f7874cd  Allow users to update everything in inputspecs except for isregexpattern (#3770)
f7874cd is described below

commit f7874cd90f42f1dac27391502dc1567a8d7b1e82
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu Mar 7 09:25:39 2019 -0800

    Allow users to update everything in inputspecs except for isregexpattern (#3770)
    
    * Allow users to update everything in inputspecs except for isregexpattern
    
    * Added more tests and fixed a bug caught by them
---
 .../pulsar/functions/utils/FunctionConfigUtils.java   |  5 +++--
 .../pulsar/functions/utils/SinkConfigUtils.java       |  5 +++--
 .../functions/utils/FunctionConfigUtilsTest.java      | 19 +++++++++++++++++++
 .../pulsar/functions/utils/SinkConfigUtilsTest.java   | 19 +++++++++++++++++++
 4 files changed, 44 insertions(+), 4 deletions(-)

diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index aef096a..1e6a85c 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -637,9 +637,10 @@ public class FunctionConfigUtils {
                 if (!existingConfig.getInputSpecs().containsKey(topicName)) {
                     throw new IllegalArgumentException("Input Topics cannot be altered");
                 }
-                if (!consumerConfig.equals(existingConfig.getInputSpecs().get(topicName))) {
-                    throw new IllegalArgumentException("Input Specs mismatch");
+                if (consumerConfig.isRegexPattern() != existingConfig.getInputSpecs().get(topicName).isRegexPattern()) {
+                    throw new IllegalArgumentException("isRegexPattern for input topic " + topicName + " cannot be altered");
                 }
+                mergedConfig.getInputSpecs().put(topicName, consumerConfig);
             });
         }
         if (!StringUtils.isEmpty(newConfig.getOutput()) && !newConfig.getOutput().equals(existingConfig.getOutput())) {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index e7c53c5..ecbe487 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -416,9 +416,10 @@ public class SinkConfigUtils {
                 if (!existingConfig.getInputSpecs().containsKey(topicName)) {
                     throw new IllegalArgumentException("Input Topics cannot be altered");
                 }
-                if (!consumerConfig.equals(existingConfig.getInputSpecs().get(topicName))) {
-                    throw new IllegalArgumentException("Input Specs mismatch");
+                if (consumerConfig.isRegexPattern() != existingConfig.getInputSpecs().get(topicName).isRegexPattern()) {
+                    throw new IllegalArgumentException("isRegexPattern for input topic " + topicName + " cannot be altered");
                 }
+                mergedConfig.getInputSpecs().put(topicName, consumerConfig);
             });
         }
         if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees())) {
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index b758c72..0f3a5a3 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -157,6 +157,25 @@ public class FunctionConfigUtilsTest {
         FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
     }
 
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "isRegexPattern for input topic test-input cannot be altered")
+    public void testMergeDifferentInputSpecWithRegexChange() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
+        inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(false).serdeClassName("my-serde").build());
+        FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("inputSpecs", inputSpecs);
+        FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
+
+    @Test
+    public void testMergeDifferentInputSpec() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
+        inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").receiverQueueSize(58).build());
+        FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("inputSpecs", inputSpecs);
+        FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(mergedConfig.getInputSpecs().get("test-input"), newFunctionConfig.getInputSpecs().get("test-input"));
+    }
+
     @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Output topics differ")
     public void testMergeDifferentOutput() {
         FunctionConfig functionConfig = createFunctionConfig();
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index dcc415b..efde8ab 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -120,6 +120,25 @@ public class SinkConfigUtilsTest {
         SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
     }
 
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "isRegexPattern for input topic test-input cannot be altered")
+    public void testMergeDifferentInputSpecWithRegexChange() {
+        SinkConfig sinkConfig = createSinkConfig();
+        Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
+        inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(false).serdeClassName("my-serde").build());
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("inputSpecs", inputSpecs);
+        SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
+    }
+
+    @Test
+    public void testMergeDifferentInputSpec() {
+        SinkConfig sinkConfig = createSinkConfig();
+        Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
+        inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").receiverQueueSize(58).build());
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("inputSpecs", inputSpecs);
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
+        assertEquals(mergedConfig.getInputSpecs().get("test-input"), newSinkConfig.getInputSpecs().get("test-input"));
+    }
+
     @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted")
     public void testMergeDifferentProcessingGuarantees() {
         SinkConfig sinkConfig = createSinkConfig();