You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/08/10 15:17:01 UTC
[pulsar] 04/12: fix lafla source config when consumerConfigProperties='' (#16731)
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4bc4659eadc1758b96291e91040c6e108f1e3757
Author: Bonan Hou <bo...@streamnative.io>
AuthorDate: Tue Jul 26 10:02:13 2022 +0800
fix lafla source config when consumerConfigProperties='' (#16731)
(cherry picked from commit 1206a37246317cdafd356dc7bd0caf9c2cc9cbc7)
---
.../org/apache/pulsar/io/kafka/KafkaSourceConfig.java | 2 ++
.../io/kafka/source/KafkaAbstractSourceTest.java | 19 +++++++++++++++++++
2 files changed, 21 insertions(+)
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
index 3fa687eceb6..332a080cc05 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.io.kafka;
+import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import lombok.Data;
@@ -112,6 +113,7 @@ public class KafkaSourceConfig implements Serializable {
public static KafkaSourceConfig load(Map<String, Object> map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
+ mapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);
return mapper.readValue(new ObjectMapper().writeValueAsString(map), KafkaSourceConfig.class);
}
}
\ No newline at end of file
diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index 1b676e2cce5..4bcf6ed8905 100644
--- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -20,6 +20,7 @@
package org.apache.pulsar.io.kafka.source;
+import com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -45,6 +46,7 @@ import java.util.concurrent.CompletableFuture;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.expectThrows;
import static org.testng.Assert.fail;
@@ -104,6 +106,23 @@ public class KafkaAbstractSourceTest {
source.close();
}
+ @Test
+ public void loadConsumerConfigPropertiesFromMapTest() throws Exception {
+ Map<String, Object> config = new HashMap<>();
+ config.put("consumerConfigProperties", "");
+ KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.load(config);
+ assertNotNull(kafkaSourceConfig);
+ assertNull(kafkaSourceConfig.getConsumerConfigProperties());
+
+ config.put("consumerConfigProperties", null);
+ kafkaSourceConfig = KafkaSourceConfig.load(config);
+ assertNull(kafkaSourceConfig.getConsumerConfigProperties());
+
+ config.put("consumerConfigProperties", ImmutableMap.of("foo", "bar"));
+ kafkaSourceConfig = KafkaSourceConfig.load(config);
+ assertEquals(kafkaSourceConfig.getConsumerConfigProperties(), ImmutableMap.of("foo", "bar"));
+ }
+
@Test
public final void loadFromYamlFileTest() throws IOException {
File yamlFile = getFile("kafkaSourceConfig.yaml");