You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:19 UTC

[pulsar] 33/38: Fix validation of function's update (#6888)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 54329f2bdb010ed25849b076aab779db6c1ba531
Author: Sergii Zhevzhyk <vz...@users.noreply.github.com>
AuthorDate: Thu May 7 09:50:12 2020 +0200

    Fix validation of function's update (#6888)
    
    ### Motivation
    
    The validation of parameters for function's update was not properly implemented for the outputSerdeClassName parameter. It was checking the outputSchemaType field instead.
    
    ### Modifications
    
    Updated the if conditions and added tests.
    (cherry picked from commit bfec5231f3e9d65dfe5a919d38411309bb255750)
---
 .../pulsar/functions/utils/FunctionConfigUtils.java       |  2 +-
 .../pulsar/functions/utils/FunctionConfigUtilsTest.java   | 15 +++++++++++++++
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 3398a5a..4876a82 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -727,7 +727,7 @@ public class FunctionConfigUtils {
                 mergedConfig.getInputSpecs().put(topicName, consumerConfig);
             });
         }
-        if (!StringUtils.isEmpty(newConfig.getOutputSchemaType()) && !newConfig.getOutputSchemaType().equals(existingConfig.getOutputSchemaType())) {
+        if (!StringUtils.isEmpty(newConfig.getOutputSerdeClassName()) && !newConfig.getOutputSerdeClassName().equals(existingConfig.getOutputSerdeClassName())) {
             throw new IllegalArgumentException("Output Serde mismatch");
         }
         if (!StringUtils.isEmpty(newConfig.getOutputSchemaType()) && !newConfig.getOutputSchemaType().equals(existingConfig.getOutputSchemaType())) {
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index 29c5e55..e0f1cea 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -416,6 +416,7 @@ public class FunctionConfigUtilsTest {
         functionConfig.setInputSpecs(inputSpecs);
         functionConfig.setOutput("test-output");
         functionConfig.setOutputSerdeClassName("test-serde");
+        functionConfig.setOutputSchemaType("json");
         functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
         functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
         functionConfig.setRetainOrdering(false);
@@ -503,4 +504,18 @@ public class FunctionConfigUtilsTest {
         assertEquals(functionConfig.getInputSpecs().keySet(), sourceSpec.getInputSpecsMap().keySet());
         assertEquals(functionConfig.getCleanupSubscription().booleanValue(), sourceSpec.getCleanupSubscription());
     }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Output Serde mismatch")
+    public void testMergeDifferentSerde() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("outputSerdeClassName", "test-updated-serde");
+        FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Output Schema mismatch")
+    public void testMergeDifferentOutputSchemaTypes() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("outputSchemaType", "avro");
+        FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
 }