You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:19 UTC
[pulsar] 33/38: Fix validation of function's update (#6888)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 54329f2bdb010ed25849b076aab779db6c1ba531
Author: Sergii Zhevzhyk <vz...@users.noreply.github.com>
AuthorDate: Thu May 7 09:50:12 2020 +0200
Fix validation of function's update (#6888)
### Motivation
The validation of parameters for function's update was not properly implemented for the outputSerdeClassName parameter. It was checking the outputSchemaType field instead.
### Modifications
Updated the if conditions and added tests.
(cherry picked from commit bfec5231f3e9d65dfe5a919d38411309bb255750)
---
.../pulsar/functions/utils/FunctionConfigUtils.java | 2 +-
.../pulsar/functions/utils/FunctionConfigUtilsTest.java | 15 +++++++++++++++
2 files changed, 16 insertions(+), 1 deletion(-)
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 3398a5a..4876a82 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -727,7 +727,7 @@ public class FunctionConfigUtils {
mergedConfig.getInputSpecs().put(topicName, consumerConfig);
});
}
- if (!StringUtils.isEmpty(newConfig.getOutputSchemaType()) && !newConfig.getOutputSchemaType().equals(existingConfig.getOutputSchemaType())) {
+ if (!StringUtils.isEmpty(newConfig.getOutputSerdeClassName()) && !newConfig.getOutputSerdeClassName().equals(existingConfig.getOutputSerdeClassName())) {
throw new IllegalArgumentException("Output Serde mismatch");
}
if (!StringUtils.isEmpty(newConfig.getOutputSchemaType()) && !newConfig.getOutputSchemaType().equals(existingConfig.getOutputSchemaType())) {
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index 29c5e55..e0f1cea 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -416,6 +416,7 @@ public class FunctionConfigUtilsTest {
functionConfig.setInputSpecs(inputSpecs);
functionConfig.setOutput("test-output");
functionConfig.setOutputSerdeClassName("test-serde");
+ functionConfig.setOutputSchemaType("json");
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
functionConfig.setRetainOrdering(false);
@@ -503,4 +504,18 @@ public class FunctionConfigUtilsTest {
assertEquals(functionConfig.getInputSpecs().keySet(), sourceSpec.getInputSpecsMap().keySet());
assertEquals(functionConfig.getCleanupSubscription().booleanValue(), sourceSpec.getCleanupSubscription());
}
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Output Serde mismatch")
+ public void testMergeDifferentSerde() {
+ FunctionConfig functionConfig = createFunctionConfig();
+ FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("outputSerdeClassName", "test-updated-serde");
+ FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Output Schema mismatch")
+ public void testMergeDifferentOutputSchemaTypes() {
+ FunctionConfig functionConfig = createFunctionConfig();
+ FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("outputSchemaType", "avro");
+ FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+ }
}