You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/12/28 17:47:19 UTC

[kafka] branch 2.0 updated: KAFKA-3832; Kafka Connect's JSON Converter never outputs a null value (#6027)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new df5336e  KAFKA-3832; Kafka Connect's JSON Converter never outputs a null value (#6027)
df5336e is described below

commit df5336e1aa4e6537b94645bb4c992f50f0093d81
Author: Renato Mefi <gh...@mefi.in>
AuthorDate: Fri Dec 28 18:39:52 2018 +0100

    KAFKA-3832; Kafka Connect's JSON Converter never outputs a null value (#6027)
    
    When using the Connect `JsonConverter`, it's impossible to produce tombstone messages, thus impacting the compaction of the topic. This patch allows the converter with and without schemas to output a NULL byte value in order to have a proper tombstone message. When it's regarding to get this data into a connect record, the approach is the same as when the payload looks like `"{ "schema": null, "payload": null }"`, this way the sink connectors can maintain their functionality and reduc [...]
    
    Reviewers: Gunnar Morling <gu...@googlemail.com>, Randall Hauch <rh...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../apache/kafka/connect/json/JsonConverter.java   | 31 ++++++++++++----------
 .../kafka/connect/json/JsonConverterTest.java      | 28 +++++++++++++++----
 2 files changed, 40 insertions(+), 19 deletions(-)

diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index c1322b1..546fcf0 100644
--- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -317,6 +317,10 @@ public class JsonConverter implements Converter, HeaderConverter {
 
     @Override
     public byte[] fromConnectData(String topic, Schema schema, Object value) {
+        if (schema == null && value == null) {
+            return null;
+        }
+
         JsonNode jsonValue = enableSchemas ? convertToJsonWithEnvelope(schema, value) : convertToJsonWithoutEnvelope(schema, value);
         try {
             return serializer.serialize(topic, jsonValue);
@@ -328,13 +332,19 @@ public class JsonConverter implements Converter, HeaderConverter {
     @Override
     public SchemaAndValue toConnectData(String topic, byte[] value) {
         JsonNode jsonValue;
+
+        // This handles a tombstone message
+        if (value == null) {
+            return SchemaAndValue.NULL;
+        }
+
         try {
             jsonValue = deserializer.deserialize(topic, value);
         } catch (SerializationException e) {
             throw new DataException("Converting byte[] to Kafka Connect data failed due to serialization error: ", e);
         }
 
-        if (enableSchemas && (jsonValue == null || !jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has("schema") || !jsonValue.has("payload")))
+        if (enableSchemas && (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)))
             throw new DataException("JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields." +
                     " If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.");
 
@@ -342,23 +352,16 @@ public class JsonConverter implements Converter, HeaderConverter {
         // was stripped during serialization and we need to fill in an all-encompassing schema.
         if (!enableSchemas) {
             ObjectNode envelope = JsonNodeFactory.instance.objectNode();
-            envelope.set("schema", null);
-            envelope.set("payload", jsonValue);
+            envelope.set(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME, null);
+            envelope.set(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME, jsonValue);
             jsonValue = envelope;
         }
 
-        return jsonToConnect(jsonValue);
-    }
-
-    private SchemaAndValue jsonToConnect(JsonNode jsonValue) {
-        if (jsonValue == null)
-            return SchemaAndValue.NULL;
-
-        if (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
-            throw new DataException("JSON value converted to Kafka Connect must be in envelope containing schema");
-
         Schema schema = asConnectSchema(jsonValue.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        return new SchemaAndValue(schema, convertToConnect(schema, jsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)));
+        return new SchemaAndValue(
+                schema,
+                convertToConnect(schema, jsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
+        );
     }
 
     public ObjectNode asJsonSchema(Schema schema) {
diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
index 7686fdb..d5bb24c 100644
--- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
+++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
@@ -172,10 +172,13 @@ public class JsonConverterTest {
         assertEquals(new SchemaAndValue(expectedSchema, expected), converted);
     }
 
-    @Test(expected = DataException.class)
+    @Test
     public void nullToConnect() {
-        // When schemas are enabled, trying to decode a null should be an error -- we should *always* have the envelope
-        assertEquals(SchemaAndValue.NULL, converter.toConnectData(TOPIC, null));
+        // When schemas are enabled, trying to decode a tombstone should be an empty envelope
+        // the behavior is the same as when the json is "{ "schema": null, "payload": null }"
+        // to keep compatibility with the record
+        SchemaAndValue converted = converter.toConnectData(TOPIC, null);
+        assertEquals(SchemaAndValue.NULL, converted);
     }
 
     @Test
@@ -696,6 +699,23 @@ public class JsonConverterTest {
         );
     }
 
+    @Test
+    public void nullSchemaAndNullValueToJson() {
+        // This characterizes the production of tombstone messages when Json schemas is enabled
+        Map<String, Boolean> props = Collections.singletonMap("schemas.enable", true);
+        converter.configure(props, true);
+        byte[] converted = converter.fromConnectData(TOPIC, null, null);
+        assertNull(converted);
+    }
+
+    @Test
+    public void nullValueToJson() {
+        // This characterizes the production of tombstone messages when Json schemas is not enabled
+        Map<String, Boolean> props = Collections.singletonMap("schemas.enable", false);
+        converter.configure(props, true);
+        byte[] converted = converter.fromConnectData(TOPIC, null, null);
+        assertNull(converted);
+    }
 
     @Test(expected = DataException.class)
     public void mismatchSchemaJson() {
@@ -703,8 +723,6 @@ public class JsonConverterTest {
         converter.fromConnectData(TOPIC, Schema.FLOAT64_SCHEMA, true);
     }
 
-
-
     @Test
     public void noSchemaToConnect() {
         Map<String, Boolean> props = Collections.singletonMap("schemas.enable", false);