You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/08 09:04:11 UTC

[pulsar] 07/33: fix lafla source config when consumerConfigProperties='' (#16731)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1147676c605d3a91a9b3ad7e91cb1bd070a3e025
Author: Bonan Hou <bo...@streamnative.io>
AuthorDate: Tue Jul 26 10:02:13 2022 +0800

    fix lafla source config when consumerConfigProperties='' (#16731)
    
    (cherry picked from commit 1206a37246317cdafd356dc7bd0caf9c2cc9cbc7)
---
 .../org/apache/pulsar/io/kafka/KafkaSourceConfig.java |  2 ++
 .../io/kafka/source/KafkaAbstractSourceTest.java      | 19 +++++++++++++++++++
 2 files changed, 21 insertions(+)

diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
index fd2d130840b..28b5944ff30 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.io.kafka;
 
+import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
 import java.io.File;
@@ -154,6 +155,7 @@ public class KafkaSourceConfig implements Serializable {
 
     public static KafkaSourceConfig load(Map<String, Object> map) throws IOException {
         ObjectMapper mapper = new ObjectMapper();
+        mapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);
         return mapper.readValue(new ObjectMapper().writeValueAsString(map), KafkaSourceConfig.class);
     }
 }
\ No newline at end of file
diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index 89797636de9..a9a5c22eb41 100644
--- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -20,6 +20,7 @@
 package org.apache.pulsar.io.kafka.source;
 
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -41,6 +42,7 @@ import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.expectThrows;
 import static org.testng.Assert.fail;
 
@@ -100,6 +102,23 @@ public class KafkaAbstractSourceTest {
         source.close();
     }
 
+    @Test
+    public void loadConsumerConfigPropertiesFromMapTest() throws Exception {
+        Map<String, Object> config = new HashMap<>();
+        config.put("consumerConfigProperties", "");
+        KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.load(config);
+        assertNotNull(kafkaSourceConfig);
+        assertNull(kafkaSourceConfig.getConsumerConfigProperties());
+
+        config.put("consumerConfigProperties", null);
+        kafkaSourceConfig = KafkaSourceConfig.load(config);
+        assertNull(kafkaSourceConfig.getConsumerConfigProperties());
+
+        config.put("consumerConfigProperties", ImmutableMap.of("foo", "bar"));
+        kafkaSourceConfig = KafkaSourceConfig.load(config);
+        assertEquals(kafkaSourceConfig.getConsumerConfigProperties(), ImmutableMap.of("foo", "bar"));
+    }
+
     @Test
     public final void loadFromYamlFileTest() throws IOException {
         File yamlFile = getFile("kafkaSourceConfig.yaml");