You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/11 00:23:11 UTC

[pulsar] 04/08: #10882 use ObjectMapper to parse Sink/Source configs (#10883)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 92aff2bbb835ffcca509e227a5af06ec7a20672b
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Thu Jun 10 14:56:44 2021 +0800

    #10882 use ObjectMapper to parse Sink/Source configs (#10883)
    
    Fixes #10882
    
    ### Motivation
    
    CmdSink and CmdSource uses `gson` to parse the JSON configs from pulsar-admin. But most of connectors are using ObjectMapper to serde the config into actual class. `gson` will also convert int/long value into float by default, which will lead ObjectMapper cannot parse float string into int/long correctlly.
    
    ### Modifications
    
    use ObjectMapper to parse sink/source config.
    
    (cherry picked from commit 2c9ea8113cbe0d2cc97e4e308f5ed0487fd13c1e)
---
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 21 ++++++++++++-----
 .../org/apache/pulsar/admin/cli/CmdSources.java    | 26 ++++++++++++++++------
 .../org/apache/pulsar/admin/cli/TestCmdSinks.java  | 16 +++++++++++--
 .../apache/pulsar/admin/cli/TestCmdSources.java    | 18 +++++++++++++--
 4 files changed, 65 insertions(+), 16 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 16e1664..b44affb 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -27,6 +27,9 @@ import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
 import com.beust.jcommander.converters.StringConverter;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.reflect.TypeToken;
@@ -55,6 +58,7 @@ import org.apache.pulsar.common.functions.UpdateOptionsImpl;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 
 @Getter
 @Parameters(commandDescription = "Interface for managing Pulsar IO sinks (egress data from Pulsar)")
@@ -463,8 +467,12 @@ public class CmdSinks extends CmdBase {
                 sinkConfig.setResources(resources);
             }
 
-            if (null != sinkConfigString) {
-                sinkConfig.setConfigs(parseConfigs(sinkConfigString));
+            try {
+                if (null != sinkConfigString) {
+                    sinkConfig.setConfigs(parseConfigs(sinkConfigString));
+                }
+            } catch (Exception ex) {
+                throw new ParameterException("Cannot parse sink-config", ex);
             }
 
             if (autoAck != null) {
@@ -485,9 +493,12 @@ public class CmdSinks extends CmdBase {
             validateSinkConfigs(sinkConfig);
         }
 
-        protected Map<String, Object> parseConfigs(String str) {
-            Type type = new TypeToken<Map<String, Object>>(){}.getType();
-            return new Gson().fromJson(str, type);
+        protected Map<String, Object> parseConfigs(String str) throws JsonProcessingException {
+            ObjectMapper mapper = ObjectMapperFactory.getThreadLocal();
+            TypeReference<HashMap<String,Object>> typeRef
+                = new TypeReference<HashMap<String,Object>>() {};
+
+            return mapper.readValue(str, typeRef);
         }
 
         protected void validateSinkConfigs(SinkConfig sinkConfig) {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 3ab6ccc..337847e 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -27,6 +27,9 @@ import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
 import com.beust.jcommander.converters.StringConverter;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.reflect.TypeToken;
@@ -35,6 +38,7 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Type;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -58,6 +62,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 
 @Getter
 @Parameters(commandDescription = "Interface for managing Pulsar IO Sources (ingress data into Pulsar)")
@@ -417,8 +422,12 @@ public class CmdSources extends CmdBase {
                 sourceConfig.setResources(resources);
             }
 
-            if (null != sourceConfigString) {
-                sourceConfig.setConfigs(parseConfigs(sourceConfigString));
+            try {
+                if (null != sourceConfigString) {
+                    sourceConfig.setConfigs(parseConfigs(sourceConfigString));
+                }
+            } catch (Exception ex) {
+                throw new ParameterException("Cannot parse source-config", ex);
             }
             
             if (null != batchSourceConfigString) {
@@ -432,12 +441,15 @@ public class CmdSources extends CmdBase {
             validateSourceConfigs(sourceConfig);
         }
 
-        protected Map<String, Object> parseConfigs(String str) {
-            Type type = new TypeToken<Map<String, Object>>(){}.getType();
-            return new Gson().fromJson(str, type); 
+        protected Map<String, Object> parseConfigs(String str) throws JsonProcessingException {
+            ObjectMapper mapper = ObjectMapperFactory.getThreadLocal();
+            TypeReference<HashMap<String,Object>> typeRef
+                    = new TypeReference<HashMap<String,Object>>() {};
+
+            return mapper.readValue(str, typeRef);
         }
-        
-        protected BatchSourceConfig parseBatchSourceConfigs(String str) {
+
+            protected BatchSourceConfig parseBatchSourceConfigs(String str) {
         	return new Gson().fromJson(str, BatchSourceConfig.class);
         }
 
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index 1f4819d..87f9a3f 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 
 import com.beust.jcommander.ParameterException;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
 import java.io.Closeable;
 import java.io.File;
@@ -94,7 +95,7 @@ public class TestCmdSinks {
     private static final Double CPU = 100.0;
     private static final Long RAM = 1024L * 1024L;
     private static final Long DISK = 1024L * 1024L * 1024L;
-    private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 0000 2018\"}";
+    private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 +0000 2018\",\"int\":1000,\"int_string\":\"1000\",\"float\":1000.0,\"float_string\":\"1000.0\"}";
 
     private PulsarAdmin pulsarAdmin;
     private Sinks sink;
@@ -144,7 +145,7 @@ public class TestCmdSinks {
         }
     }
 
-    public SinkConfig getSinkConfig() {
+    public SinkConfig getSinkConfig() throws JsonProcessingException {
         SinkConfig sinkConfig = new SinkConfig();
         sinkConfig.setTenant(TENANT);
         sinkConfig.setNamespace(NAMESPACE);
@@ -738,4 +739,15 @@ public class TestCmdSinks {
 
 
     }
+
+    @Test
+    public void testParseConfigs() throws Exception {
+        SinkConfig testSinkConfig = getSinkConfig();
+        Map<String, Object> config = testSinkConfig.getConfigs();
+        Assert.assertEquals(config.get("int"), 1000);
+        Assert.assertEquals(config.get("int_string"), "1000");
+        Assert.assertEquals(config.get("float"), 1000.0);
+        Assert.assertEquals(config.get("float_string"), "1000.0");
+        Assert.assertEquals(config.get("created_at"), "Mon Jul 02 00:33:15 +0000 2018");
+    }
 }
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 23791ae..13e0ff0 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -28,11 +28,13 @@ import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 
 import com.beust.jcommander.ParameterException;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.Map;
 
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -75,7 +77,8 @@ public class TestCmdSources {
     private static final Double CPU = 100.0;
     private static final Long RAM = 1024L * 1024L;
     private static final Long DISK = 1024L * 1024L * 1024L;
-    private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 +0000 2018\"}";
+    private static final String SINK_CONFIG_STRING =
+            "{\"created_at\":\"Mon Jul 02 00:33:15 +0000 2018\",\"int\":1000,\"int_string\":\"1000\",\"float\":1000.0,\"float_string\":\"1000.0\"}";
     private static final String BATCH_SOURCE_CONFIG_STRING = "{ \"discoveryTriggererClassName\" : \"org.apache.pulsar.io.batchdiscovery.CronTriggerer\","
 			+ "\"discoveryTriggererConfig\": {\"cron\": \"5 0 0 0 0 *\"} }";
 
@@ -122,7 +125,7 @@ public class TestCmdSources {
         }
     }
 
-    public SourceConfig getSourceConfig() {
+    public SourceConfig getSourceConfig() throws JsonProcessingException {
         SourceConfig sourceConfig = new SourceConfig();
         sourceConfig.setTenant(TENANT);
         sourceConfig.setNamespace(NAMESPACE);
@@ -690,4 +693,15 @@ public class TestCmdSources {
 
 
     }
+
+    @Test
+    public void testParseConfigs() throws Exception {
+        SourceConfig testSourceConfig = getSourceConfig();
+        Map<String, Object> config = testSourceConfig.getConfigs();
+        Assert.assertEquals(config.get("int"), 1000);
+        Assert.assertEquals(config.get("int_string"), "1000");
+        Assert.assertEquals(config.get("float"), 1000.0);
+        Assert.assertEquals(config.get("float_string"), "1000.0");
+        Assert.assertEquals(config.get("created_at"), "Mon Jul 02 00:33:15 +0000 2018");
+    }
 }
\ No newline at end of file