You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/11 00:23:11 UTC
[pulsar] 04/08: #10882 use ObjectMapper to parse Sink/Source
configs (#10883)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 92aff2bbb835ffcca509e227a5af06ec7a20672b
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Thu Jun 10 14:56:44 2021 +0800
#10882 use ObjectMapper to parse Sink/Source configs (#10883)
Fixes #10882
### Motivation
CmdSink and CmdSource uses `gson` to parse the JSON configs from pulsar-admin. But most of connectors are using ObjectMapper to serde the config into actual class. `gson` will also convert int/long value into float by default, which will lead ObjectMapper cannot parse float string into int/long correctlly.
### Modifications
use ObjectMapper to parse sink/source config.
(cherry picked from commit 2c9ea8113cbe0d2cc97e4e308f5ed0487fd13c1e)
---
.../java/org/apache/pulsar/admin/cli/CmdSinks.java | 21 ++++++++++++-----
.../org/apache/pulsar/admin/cli/CmdSources.java | 26 ++++++++++++++++------
.../org/apache/pulsar/admin/cli/TestCmdSinks.java | 16 +++++++++++--
.../apache/pulsar/admin/cli/TestCmdSources.java | 18 +++++++++++++--
4 files changed, 65 insertions(+), 16 deletions(-)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 16e1664..b44affb 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -27,6 +27,9 @@ import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.beust.jcommander.converters.StringConverter;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
@@ -55,6 +58,7 @@ import org.apache.pulsar.common.functions.UpdateOptionsImpl;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
@Getter
@Parameters(commandDescription = "Interface for managing Pulsar IO sinks (egress data from Pulsar)")
@@ -463,8 +467,12 @@ public class CmdSinks extends CmdBase {
sinkConfig.setResources(resources);
}
- if (null != sinkConfigString) {
- sinkConfig.setConfigs(parseConfigs(sinkConfigString));
+ try {
+ if (null != sinkConfigString) {
+ sinkConfig.setConfigs(parseConfigs(sinkConfigString));
+ }
+ } catch (Exception ex) {
+ throw new ParameterException("Cannot parse sink-config", ex);
}
if (autoAck != null) {
@@ -485,9 +493,12 @@ public class CmdSinks extends CmdBase {
validateSinkConfigs(sinkConfig);
}
- protected Map<String, Object> parseConfigs(String str) {
- Type type = new TypeToken<Map<String, Object>>(){}.getType();
- return new Gson().fromJson(str, type);
+ protected Map<String, Object> parseConfigs(String str) throws JsonProcessingException {
+ ObjectMapper mapper = ObjectMapperFactory.getThreadLocal();
+ TypeReference<HashMap<String,Object>> typeRef
+ = new TypeReference<HashMap<String,Object>>() {};
+
+ return mapper.readValue(str, typeRef);
}
protected void validateSinkConfigs(SinkConfig sinkConfig) {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 3ab6ccc..337847e 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -27,6 +27,9 @@ import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.beust.jcommander.converters.StringConverter;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
@@ -35,6 +38,7 @@ import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Type;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -58,6 +62,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
@Getter
@Parameters(commandDescription = "Interface for managing Pulsar IO Sources (ingress data into Pulsar)")
@@ -417,8 +422,12 @@ public class CmdSources extends CmdBase {
sourceConfig.setResources(resources);
}
- if (null != sourceConfigString) {
- sourceConfig.setConfigs(parseConfigs(sourceConfigString));
+ try {
+ if (null != sourceConfigString) {
+ sourceConfig.setConfigs(parseConfigs(sourceConfigString));
+ }
+ } catch (Exception ex) {
+ throw new ParameterException("Cannot parse source-config", ex);
}
if (null != batchSourceConfigString) {
@@ -432,12 +441,15 @@ public class CmdSources extends CmdBase {
validateSourceConfigs(sourceConfig);
}
- protected Map<String, Object> parseConfigs(String str) {
- Type type = new TypeToken<Map<String, Object>>(){}.getType();
- return new Gson().fromJson(str, type);
+ protected Map<String, Object> parseConfigs(String str) throws JsonProcessingException {
+ ObjectMapper mapper = ObjectMapperFactory.getThreadLocal();
+ TypeReference<HashMap<String,Object>> typeRef
+ = new TypeReference<HashMap<String,Object>>() {};
+
+ return mapper.readValue(str, typeRef);
}
-
- protected BatchSourceConfig parseBatchSourceConfigs(String str) {
+
+ protected BatchSourceConfig parseBatchSourceConfigs(String str) {
return new Gson().fromJson(str, BatchSourceConfig.class);
}
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index 1f4819d..87f9a3f 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import com.beust.jcommander.ParameterException;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
import java.io.Closeable;
import java.io.File;
@@ -94,7 +95,7 @@ public class TestCmdSinks {
private static final Double CPU = 100.0;
private static final Long RAM = 1024L * 1024L;
private static final Long DISK = 1024L * 1024L * 1024L;
- private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 0000 2018\"}";
+ private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 +0000 2018\",\"int\":1000,\"int_string\":\"1000\",\"float\":1000.0,\"float_string\":\"1000.0\"}";
private PulsarAdmin pulsarAdmin;
private Sinks sink;
@@ -144,7 +145,7 @@ public class TestCmdSinks {
}
}
- public SinkConfig getSinkConfig() {
+ public SinkConfig getSinkConfig() throws JsonProcessingException {
SinkConfig sinkConfig = new SinkConfig();
sinkConfig.setTenant(TENANT);
sinkConfig.setNamespace(NAMESPACE);
@@ -738,4 +739,15 @@ public class TestCmdSinks {
}
+
+ @Test
+ public void testParseConfigs() throws Exception {
+ SinkConfig testSinkConfig = getSinkConfig();
+ Map<String, Object> config = testSinkConfig.getConfigs();
+ Assert.assertEquals(config.get("int"), 1000);
+ Assert.assertEquals(config.get("int_string"), "1000");
+ Assert.assertEquals(config.get("float"), 1000.0);
+ Assert.assertEquals(config.get("float_string"), "1000.0");
+ Assert.assertEquals(config.get("created_at"), "Mon Jul 02 00:33:15 +0000 2018");
+ }
}
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 23791ae..13e0ff0 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -28,11 +28,13 @@ import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import com.beust.jcommander.ParameterException;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.util.Map;
import org.apache.pulsar.admin.cli.utils.CmdUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -75,7 +77,8 @@ public class TestCmdSources {
private static final Double CPU = 100.0;
private static final Long RAM = 1024L * 1024L;
private static final Long DISK = 1024L * 1024L * 1024L;
- private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 +0000 2018\"}";
+ private static final String SINK_CONFIG_STRING =
+ "{\"created_at\":\"Mon Jul 02 00:33:15 +0000 2018\",\"int\":1000,\"int_string\":\"1000\",\"float\":1000.0,\"float_string\":\"1000.0\"}";
private static final String BATCH_SOURCE_CONFIG_STRING = "{ \"discoveryTriggererClassName\" : \"org.apache.pulsar.io.batchdiscovery.CronTriggerer\","
+ "\"discoveryTriggererConfig\": {\"cron\": \"5 0 0 0 0 *\"} }";
@@ -122,7 +125,7 @@ public class TestCmdSources {
}
}
- public SourceConfig getSourceConfig() {
+ public SourceConfig getSourceConfig() throws JsonProcessingException {
SourceConfig sourceConfig = new SourceConfig();
sourceConfig.setTenant(TENANT);
sourceConfig.setNamespace(NAMESPACE);
@@ -690,4 +693,15 @@ public class TestCmdSources {
}
+
+ @Test
+ public void testParseConfigs() throws Exception {
+ SourceConfig testSourceConfig = getSourceConfig();
+ Map<String, Object> config = testSourceConfig.getConfigs();
+ Assert.assertEquals(config.get("int"), 1000);
+ Assert.assertEquals(config.get("int_string"), "1000");
+ Assert.assertEquals(config.get("float"), 1000.0);
+ Assert.assertEquals(config.get("float_string"), "1000.0");
+ Assert.assertEquals(config.get("created_at"), "Mon Jul 02 00:33:15 +0000 2018");
+ }
}
\ No newline at end of file