You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/05/07 22:31:37 UTC

[kafka] branch 2.5 updated: KAFKA-9667: Connect JSON serde strip trailing zeros (#8230)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 162448a  KAFKA-9667: Connect JSON serde strip trailing zeros (#8230)
162448a is described below

commit 162448a2aefa47116abffd234f844b9c6e08c4da
Author: Andy Coates <80...@users.noreply.github.com>
AuthorDate: Thu May 7 22:21:08 2020 +0100

    KAFKA-9667: Connect JSON serde strip trailing zeros (#8230)
    
    This change turns on exact decimal processing in JSON Converter for deserializing decimals, meaning trailing zeros are maintained. Serialization was already using the decimal scale to output the right value, so this change means a value of `1.2300` can now be serialized to JSON and deserialized back to Connect without any loss of information.
    
    Author: Andy Coates <bi...@users.noreply.github.com>
    Reviewers: Randall Hauch <rh...@gmail.com>, Almog Gavra <al...@confluent.io>
---
 .../apache/kafka/connect/json/JsonConverter.java   | 81 ++++++++++++----------
 .../kafka/connect/json/JsonDeserializer.java       | 12 +++-
 .../apache/kafka/connect/json/JsonSerializer.java  | 20 ++++++
 .../kafka/connect/json/JsonConverterTest.java      | 31 ++++++++-
 4 files changed, 103 insertions(+), 41 deletions(-)

diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index ada2785..8a2d676 100644
--- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -21,8 +21,6 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import java.util.HashSet;
-import java.util.Set;
 import org.apache.kafka.common.cache.Cache;
 import org.apache.kafka.common.cache.LRUCache;
 import org.apache.kafka.common.cache.SynchronizedCache;
@@ -54,6 +52,8 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkSet;
+
 /**
  * Implementation of Converter that uses JSON to store schemas and objects. By default this converter will serialize Connect keys, values,
  * and headers with schemas, although this can be disabled with {@link JsonConverterConfig#SCHEMAS_ENABLE_CONFIG schemas.enable}
@@ -191,6 +191,9 @@ public class JsonConverter implements Converter, HeaderConverter {
     // Convert values in Kafka Connect form into/from their logical types. These logical converters are discovered by logical type
     // names specified in the field
     private static final HashMap<String, LogicalTypeConverter> LOGICAL_CONVERTERS = new HashMap<>();
+
+    private static final JsonNodeFactory JSON_NODE_FACTORY = JsonNodeFactory.withExactBigDecimals(true);
+
     static {
         LOGICAL_CONVERTERS.put(Decimal.LOGICAL_NAME, new LogicalTypeConverter() {
             @Override
@@ -201,9 +204,9 @@ public class JsonConverter implements Converter, HeaderConverter {
                 final BigDecimal decimal = (BigDecimal) value;
                 switch (config.decimalFormat()) {
                     case NUMERIC:
-                        return JsonNodeFactory.instance.numberNode(decimal);
+                        return JSON_NODE_FACTORY.numberNode(decimal);
                     case BASE64:
-                        return JsonNodeFactory.instance.binaryNode(Decimal.fromLogical(schema, decimal));
+                        return JSON_NODE_FACTORY.binaryNode(Decimal.fromLogical(schema, decimal));
                     default:
                         throw new DataException("Unexpected " + JsonConverterConfig.DECIMAL_FORMAT_CONFIG + ": " + config.decimalFormat());
                 }
@@ -229,7 +232,7 @@ public class JsonConverter implements Converter, HeaderConverter {
             public JsonNode toJson(final Schema schema, final Object value, final JsonConverterConfig config) {
                 if (!(value instanceof java.util.Date))
                     throw new DataException("Invalid type for Date, expected Date but was " + value.getClass());
-                return JsonNodeFactory.instance.numberNode(Date.fromLogical(schema, (java.util.Date) value));
+                return JSON_NODE_FACTORY.numberNode(Date.fromLogical(schema, (java.util.Date) value));
             }
 
             @Override
@@ -245,7 +248,7 @@ public class JsonConverter implements Converter, HeaderConverter {
             public JsonNode toJson(final Schema schema, final Object value, final JsonConverterConfig config) {
                 if (!(value instanceof java.util.Date))
                     throw new DataException("Invalid type for Time, expected Date but was " + value.getClass());
-                return JsonNodeFactory.instance.numberNode(Time.fromLogical(schema, (java.util.Date) value));
+                return JSON_NODE_FACTORY.numberNode(Time.fromLogical(schema, (java.util.Date) value));
             }
 
             @Override
@@ -261,7 +264,7 @@ public class JsonConverter implements Converter, HeaderConverter {
             public JsonNode toJson(final Schema schema, final Object value, final JsonConverterConfig config) {
                 if (!(value instanceof java.util.Date))
                     throw new DataException("Invalid type for Timestamp, expected Date but was " + value.getClass());
-                return JsonNodeFactory.instance.numberNode(Timestamp.fromLogical(schema, (java.util.Date) value));
+                return JSON_NODE_FACTORY.numberNode(Timestamp.fromLogical(schema, (java.util.Date) value));
             }
 
             @Override
@@ -277,15 +280,23 @@ public class JsonConverter implements Converter, HeaderConverter {
     private Cache<Schema, ObjectNode> fromConnectSchemaCache;
     private Cache<JsonNode, Schema> toConnectSchemaCache;
 
-    private final JsonSerializer serializer = new JsonSerializer();
+    private final JsonSerializer serializer;
     private final JsonDeserializer deserializer;
 
     public JsonConverter() {
-        // this ensures that the JsonDeserializer maintains full precision on
-        // floating point numbers that cannot fit into float64
-        final Set<DeserializationFeature> deserializationFeatures = new HashSet<>();
-        deserializationFeatures.add(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
-        deserializer = new JsonDeserializer(deserializationFeatures);
+        serializer = new JsonSerializer(
+            mkSet(),
+            JSON_NODE_FACTORY
+        );
+
+        deserializer = new JsonDeserializer(
+            mkSet(
+                // this ensures that the JsonDeserializer maintains full precision on
+                // floating point numbers that cannot fit into float64
+                DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS
+            ),
+            JSON_NODE_FACTORY
+        );
     }
 
     @Override
@@ -362,7 +373,7 @@ public class JsonConverter implements Converter, HeaderConverter {
         // The deserialized data should either be an envelope object containing the schema and the payload or the schema
         // was stripped during serialization and we need to fill in an all-encompassing schema.
         if (!config.schemasEnabled()) {
-            ObjectNode envelope = JsonNodeFactory.instance.objectNode();
+            ObjectNode envelope = JSON_NODE_FACTORY.objectNode();
             envelope.set(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME, null);
             envelope.set(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME, jsonValue);
             jsonValue = envelope;
@@ -413,17 +424,17 @@ public class JsonConverter implements Converter, HeaderConverter {
                 jsonSchema = JsonSchema.STRING_SCHEMA.deepCopy();
                 break;
             case ARRAY:
-                jsonSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.ARRAY_TYPE_NAME);
+                jsonSchema = JSON_NODE_FACTORY.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.ARRAY_TYPE_NAME);
                 jsonSchema.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, asJsonSchema(schema.valueSchema()));
                 break;
             case MAP:
-                jsonSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.MAP_TYPE_NAME);
+                jsonSchema = JSON_NODE_FACTORY.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.MAP_TYPE_NAME);
                 jsonSchema.set(JsonSchema.MAP_KEY_FIELD_NAME, asJsonSchema(schema.keySchema()));
                 jsonSchema.set(JsonSchema.MAP_VALUE_FIELD_NAME, asJsonSchema(schema.valueSchema()));
                 break;
             case STRUCT:
-                jsonSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.STRUCT_TYPE_NAME);
-                ArrayNode fields = JsonNodeFactory.instance.arrayNode();
+                jsonSchema = JSON_NODE_FACTORY.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.STRUCT_TYPE_NAME);
+                ArrayNode fields = JSON_NODE_FACTORY.arrayNode();
                 for (Field field : schema.fields()) {
                     ObjectNode fieldJsonSchema = asJsonSchema(field.schema()).deepCopy();
                     fieldJsonSchema.put(JsonSchema.STRUCT_FIELD_NAME_FIELD_NAME, field.name());
@@ -443,7 +454,7 @@ public class JsonConverter implements Converter, HeaderConverter {
         if (schema.doc() != null)
             jsonSchema.put(JsonSchema.SCHEMA_DOC_FIELD_NAME, schema.doc());
         if (schema.parameters() != null) {
-            ObjectNode jsonSchemaParams = JsonNodeFactory.instance.objectNode();
+            ObjectNode jsonSchemaParams = JSON_NODE_FACTORY.objectNode();
             for (Map.Entry<String, String> prop : schema.parameters().entrySet())
                 jsonSchemaParams.put(prop.getKey(), prop.getValue());
             jsonSchema.set(JsonSchema.SCHEMA_PARAMETERS_FIELD_NAME, jsonSchemaParams);
@@ -596,7 +607,7 @@ public class JsonConverter implements Converter, HeaderConverter {
             if (schema.defaultValue() != null)
                 return convertToJson(schema, schema.defaultValue());
             if (schema.isOptional())
-                return JsonNodeFactory.instance.nullNode();
+                return JSON_NODE_FACTORY.nullNode();
             throw new DataException("Conversion error: null value for field that is required and has no default value");
         }
 
@@ -617,32 +628,32 @@ public class JsonConverter implements Converter, HeaderConverter {
             }
             switch (schemaType) {
                 case INT8:
-                    return JsonNodeFactory.instance.numberNode((Byte) value);
+                    return JSON_NODE_FACTORY.numberNode((Byte) value);
                 case INT16:
-                    return JsonNodeFactory.instance.numberNode((Short) value);
+                    return JSON_NODE_FACTORY.numberNode((Short) value);
                 case INT32:
-                    return JsonNodeFactory.instance.numberNode((Integer) value);
+                    return JSON_NODE_FACTORY.numberNode((Integer) value);
                 case INT64:
-                    return JsonNodeFactory.instance.numberNode((Long) value);
+                    return JSON_NODE_FACTORY.numberNode((Long) value);
                 case FLOAT32:
-                    return JsonNodeFactory.instance.numberNode((Float) value);
+                    return JSON_NODE_FACTORY.numberNode((Float) value);
                 case FLOAT64:
-                    return JsonNodeFactory.instance.numberNode((Double) value);
+                    return JSON_NODE_FACTORY.numberNode((Double) value);
                 case BOOLEAN:
-                    return JsonNodeFactory.instance.booleanNode((Boolean) value);
+                    return JSON_NODE_FACTORY.booleanNode((Boolean) value);
                 case STRING:
                     CharSequence charSeq = (CharSequence) value;
-                    return JsonNodeFactory.instance.textNode(charSeq.toString());
+                    return JSON_NODE_FACTORY.textNode(charSeq.toString());
                 case BYTES:
                     if (value instanceof byte[])
-                        return JsonNodeFactory.instance.binaryNode((byte[]) value);
+                        return JSON_NODE_FACTORY.binaryNode((byte[]) value);
                     else if (value instanceof ByteBuffer)
-                        return JsonNodeFactory.instance.binaryNode(((ByteBuffer) value).array());
+                        return JSON_NODE_FACTORY.binaryNode(((ByteBuffer) value).array());
                     else
                         throw new DataException("Invalid type for bytes type: " + value.getClass());
                 case ARRAY: {
                     Collection collection = (Collection) value;
-                    ArrayNode list = JsonNodeFactory.instance.arrayNode();
+                    ArrayNode list = JSON_NODE_FACTORY.arrayNode();
                     for (Object elem : collection) {
                         Schema valueSchema = schema == null ? null : schema.valueSchema();
                         JsonNode fieldValue = convertToJson(valueSchema, elem);
@@ -668,9 +679,9 @@ public class JsonConverter implements Converter, HeaderConverter {
                     ObjectNode obj = null;
                     ArrayNode list = null;
                     if (objectMode)
-                        obj = JsonNodeFactory.instance.objectNode();
+                        obj = JSON_NODE_FACTORY.objectNode();
                     else
-                        list = JsonNodeFactory.instance.arrayNode();
+                        list = JSON_NODE_FACTORY.arrayNode();
                     for (Map.Entry<?, ?> entry : map.entrySet()) {
                         Schema keySchema = schema == null ? null : schema.keySchema();
                         Schema valueSchema = schema == null ? null : schema.valueSchema();
@@ -680,7 +691,7 @@ public class JsonConverter implements Converter, HeaderConverter {
                         if (objectMode)
                             obj.set(mapKey.asText(), mapValue);
                         else
-                            list.add(JsonNodeFactory.instance.arrayNode().add(mapKey).add(mapValue));
+                            list.add(JSON_NODE_FACTORY.arrayNode().add(mapKey).add(mapValue));
                     }
                     return objectMode ? obj : list;
                 }
@@ -688,7 +699,7 @@ public class JsonConverter implements Converter, HeaderConverter {
                     Struct struct = (Struct) value;
                     if (!struct.schema().equals(schema))
                         throw new DataException("Mismatching schema.");
-                    ObjectNode obj = JsonNodeFactory.instance.objectNode();
+                    ObjectNode obj = JSON_NODE_FACTORY.objectNode();
                     for (Field field : schema.fields()) {
                         obj.set(field.name(), convertToJson(field.schema(), struct.get(field)));
                     }
diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java
index a656e53..2e6e821 100644
--- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.json;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import java.util.Collections;
 import java.util.Set;
 import org.apache.kafka.common.errors.SerializationException;
@@ -29,13 +30,13 @@ import org.apache.kafka.common.serialization.Deserializer;
  * structured data without having associated Java classes. This deserializer also supports Connect schemas.
  */
 public class JsonDeserializer implements Deserializer<JsonNode> {
-    private ObjectMapper objectMapper = new ObjectMapper();
+    private final ObjectMapper objectMapper = new ObjectMapper();
 
     /**
      * Default constructor needed by Kafka
      */
     public JsonDeserializer() {
-        this(Collections.emptySet());
+        this(Collections.emptySet(), JsonNodeFactory.withExactBigDecimals(true));
     }
 
     /**
@@ -43,9 +44,14 @@ public class JsonDeserializer implements Deserializer<JsonNode> {
      * for the deserializer
      *
      * @param deserializationFeatures the specified deserialization features
+     * @param jsonNodeFactory the json node factory to use.
      */
-    JsonDeserializer(final Set<DeserializationFeature> deserializationFeatures) {
+    JsonDeserializer(
+        final Set<DeserializationFeature> deserializationFeatures,
+        final JsonNodeFactory jsonNodeFactory
+    ) {
         deserializationFeatures.forEach(objectMapper::enable);
+        objectMapper.setNodeFactory(jsonNodeFactory);
     }
 
     @Override
diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java
index 94ec0a8..0f2b62b 100644
--- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java
@@ -18,9 +18,14 @@ package org.apache.kafka.connect.json;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.serialization.Serializer;
 
+import java.util.Collections;
+import java.util.Set;
+
 /**
  * Serialize Jackson JsonNode tree model objects to UTF-8 JSON. Using the tree model allows handling arbitrarily
  * structured data without corresponding Java classes. This serializer also supports Connect schemas.
@@ -32,7 +37,22 @@ public class JsonSerializer implements Serializer<JsonNode> {
      * Default constructor needed by Kafka
      */
     public JsonSerializer() {
+        this(Collections.emptySet(), JsonNodeFactory.withExactBigDecimals(true));
+    }
 
+    /**
+     * A constructor that additionally specifies some {@link SerializationFeature}
+     * for the serializer
+     *
+     * @param serializationFeatures the specified serialization features
+     * @param jsonNodeFactory the json node factory to use.
+     */
+    JsonSerializer(
+        final Set<SerializationFeature> serializationFeatures,
+        final JsonNodeFactory jsonNodeFactory
+    ) {
+        serializationFeatures.forEach(objectMapper::enable);
+        objectMapper.setNodeFactory(jsonNodeFactory);
     }
 
     @Override
diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
index 2a56950..2e189e2 100644
--- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
+++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.json;
 
+import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
@@ -65,8 +66,11 @@ import static org.junit.Assert.fail;
 public class JsonConverterTest {
     private static final String TOPIC = "topic";
 
-    ObjectMapper objectMapper = new ObjectMapper();
-    JsonConverter converter = new JsonConverter();
+    private final ObjectMapper objectMapper = new ObjectMapper()
+        .enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS)
+        .setNodeFactory(JsonNodeFactory.withExactBigDecimals(true));
+
+    private final JsonConverter converter = new JsonConverter();
 
     @Before
     public void setUp() {
@@ -273,6 +277,16 @@ public class JsonConverterTest {
     }
 
     @Test
+    public void numericDecimalWithTrailingZerosToConnect() {
+        BigDecimal reference = new BigDecimal(new BigInteger("15600"), 4);
+        Schema schema = Decimal.schema(4);
+        String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": { \"scale\": \"4\" } }, \"payload\": 1.5600 }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(reference, schemaAndValue.value());
+    }
+
+    @Test
     public void highPrecisionNumericDecimalToConnect() {
         // this number is too big to be kept in a float64!
         BigDecimal reference = new BigDecimal("1.23456789123456789");
@@ -634,7 +648,18 @@ public class JsonConverterTest {
     }
 
     @Test
-    public void decimalToJsonWithoutSchema() throws IOException {
+    public void decimalWithTrailingZerosToNumericJson() {
+        converter.configure(Collections.singletonMap(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name()), false);
+        JsonNode converted = parse(converter.fromConnectData(TOPIC, Decimal.schema(4), new BigDecimal(new BigInteger("15600"), 4)));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"bytes\", \"optional\": false, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": { \"scale\": \"4\" } }"),
+            converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertTrue("expected node to be numeric", converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isNumber());
+        assertEquals(new BigDecimal("1.5600"), converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).decimalValue());
+    }
+
+    @Test
+    public void decimalToJsonWithoutSchema() {
         assertThrows(
             "expected data exception when serializing BigDecimal without schema",
             DataException.class,