You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2019/03/07 17:25:43 UTC
[pulsar] branch master updated: Allow users to update everything in
inputspecs except for isregexpattern (#3770)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f7874cd Allow users to update everything in inputspecs except for isregexpattern (#3770)
f7874cd is described below
commit f7874cd90f42f1dac27391502dc1567a8d7b1e82
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu Mar 7 09:25:39 2019 -0800
Allow users to update everything in inputspecs except for isregexpattern (#3770)
* Allow users to update everything in inputspecs except for isregexpattern
* Added more tests and fixed a bug caught by them
---
.../pulsar/functions/utils/FunctionConfigUtils.java | 5 +++--
.../pulsar/functions/utils/SinkConfigUtils.java | 5 +++--
.../functions/utils/FunctionConfigUtilsTest.java | 19 +++++++++++++++++++
.../pulsar/functions/utils/SinkConfigUtilsTest.java | 19 +++++++++++++++++++
4 files changed, 44 insertions(+), 4 deletions(-)
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index aef096a..1e6a85c 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -637,9 +637,10 @@ public class FunctionConfigUtils {
if (!existingConfig.getInputSpecs().containsKey(topicName)) {
throw new IllegalArgumentException("Input Topics cannot be altered");
}
- if (!consumerConfig.equals(existingConfig.getInputSpecs().get(topicName))) {
- throw new IllegalArgumentException("Input Specs mismatch");
+ if (consumerConfig.isRegexPattern() != existingConfig.getInputSpecs().get(topicName).isRegexPattern()) {
+ throw new IllegalArgumentException("isRegexPattern for input topic " + topicName + " cannot be altered");
}
+ mergedConfig.getInputSpecs().put(topicName, consumerConfig);
});
}
if (!StringUtils.isEmpty(newConfig.getOutput()) && !newConfig.getOutput().equals(existingConfig.getOutput())) {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index e7c53c5..ecbe487 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -416,9 +416,10 @@ public class SinkConfigUtils {
if (!existingConfig.getInputSpecs().containsKey(topicName)) {
throw new IllegalArgumentException("Input Topics cannot be altered");
}
- if (!consumerConfig.equals(existingConfig.getInputSpecs().get(topicName))) {
- throw new IllegalArgumentException("Input Specs mismatch");
+ if (consumerConfig.isRegexPattern() != existingConfig.getInputSpecs().get(topicName).isRegexPattern()) {
+ throw new IllegalArgumentException("isRegexPattern for input topic " + topicName + " cannot be altered");
}
+ mergedConfig.getInputSpecs().put(topicName, consumerConfig);
});
}
if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees())) {
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index b758c72..0f3a5a3 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -157,6 +157,25 @@ public class FunctionConfigUtilsTest {
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
}
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "isRegexPattern for input topic test-input cannot be altered")
+ public void testMergeDifferentInputSpecWithRegexChange() {
+ FunctionConfig functionConfig = createFunctionConfig();
+ Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
+ inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(false).serdeClassName("my-serde").build());
+ FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("inputSpecs", inputSpecs);
+ FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+ }
+
+ @Test
+ public void testMergeDifferentInputSpec() {
+ FunctionConfig functionConfig = createFunctionConfig();
+ Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
+ inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").receiverQueueSize(58).build());
+ FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("inputSpecs", inputSpecs);
+ FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+ assertEquals(mergedConfig.getInputSpecs().get("test-input"), newFunctionConfig.getInputSpecs().get("test-input"));
+ }
+
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Output topics differ")
public void testMergeDifferentOutput() {
FunctionConfig functionConfig = createFunctionConfig();
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index dcc415b..efde8ab 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -120,6 +120,25 @@ public class SinkConfigUtilsTest {
SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
}
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "isRegexPattern for input topic test-input cannot be altered")
+ public void testMergeDifferentInputSpecWithRegexChange() {
+ SinkConfig sinkConfig = createSinkConfig();
+ Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
+ inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(false).serdeClassName("my-serde").build());
+ SinkConfig newSinkConfig = createUpdatedSinkConfig("inputSpecs", inputSpecs);
+ SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
+ }
+
+ @Test
+ public void testMergeDifferentInputSpec() {
+ SinkConfig sinkConfig = createSinkConfig();
+ Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
+ inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").receiverQueueSize(58).build());
+ SinkConfig newSinkConfig = createUpdatedSinkConfig("inputSpecs", inputSpecs);
+ SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
+ assertEquals(mergedConfig.getInputSpecs().get("test-input"), newSinkConfig.getInputSpecs().get("test-input"));
+ }
+
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted")
public void testMergeDifferentProcessingGuarantees() {
SinkConfig sinkConfig = createSinkConfig();