You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/12/28 17:47:19 UTC
[kafka] branch 2.0 updated: KAFKA-3832;
Kafka Connect's JSON Converter never outputs a null value (#6027)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new df5336e KAFKA-3832; Kafka Connect's JSON Converter never outputs a null value (#6027)
df5336e is described below
commit df5336e1aa4e6537b94645bb4c992f50f0093d81
Author: Renato Mefi <gh...@mefi.in>
AuthorDate: Fri Dec 28 18:39:52 2018 +0100
KAFKA-3832; Kafka Connect's JSON Converter never outputs a null value (#6027)
When using the Connect `JsonConverter`, it's impossible to produce tombstone messages, thus impacting the compaction of the topic. This patch allows the converter with and without schemas to output a NULL byte value in order to have a proper tombstone message. When it's regarding to get this data into a connect record, the approach is the same as when the payload looks like `"{ "schema": null, "payload": null }"`, this way the sink connectors can maintain their functionality and reduc [...]
Reviewers: Gunnar Morling <gu...@googlemail.com>, Randall Hauch <rh...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
.../apache/kafka/connect/json/JsonConverter.java | 31 ++++++++++++----------
.../kafka/connect/json/JsonConverterTest.java | 28 +++++++++++++++----
2 files changed, 40 insertions(+), 19 deletions(-)
diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index c1322b1..546fcf0 100644
--- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -317,6 +317,10 @@ public class JsonConverter implements Converter, HeaderConverter {
@Override
public byte[] fromConnectData(String topic, Schema schema, Object value) {
+ if (schema == null && value == null) {
+ return null;
+ }
+
JsonNode jsonValue = enableSchemas ? convertToJsonWithEnvelope(schema, value) : convertToJsonWithoutEnvelope(schema, value);
try {
return serializer.serialize(topic, jsonValue);
@@ -328,13 +332,19 @@ public class JsonConverter implements Converter, HeaderConverter {
@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
JsonNode jsonValue;
+
+ // This handles a tombstone message
+ if (value == null) {
+ return SchemaAndValue.NULL;
+ }
+
try {
jsonValue = deserializer.deserialize(topic, value);
} catch (SerializationException e) {
throw new DataException("Converting byte[] to Kafka Connect data failed due to serialization error: ", e);
}
- if (enableSchemas && (jsonValue == null || !jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has("schema") || !jsonValue.has("payload")))
+ if (enableSchemas && (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)))
throw new DataException("JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields." +
" If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.");
@@ -342,23 +352,16 @@ public class JsonConverter implements Converter, HeaderConverter {
// was stripped during serialization and we need to fill in an all-encompassing schema.
if (!enableSchemas) {
ObjectNode envelope = JsonNodeFactory.instance.objectNode();
- envelope.set("schema", null);
- envelope.set("payload", jsonValue);
+ envelope.set(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME, null);
+ envelope.set(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME, jsonValue);
jsonValue = envelope;
}
- return jsonToConnect(jsonValue);
- }
-
- private SchemaAndValue jsonToConnect(JsonNode jsonValue) {
- if (jsonValue == null)
- return SchemaAndValue.NULL;
-
- if (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
- throw new DataException("JSON value converted to Kafka Connect must be in envelope containing schema");
-
Schema schema = asConnectSchema(jsonValue.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
- return new SchemaAndValue(schema, convertToConnect(schema, jsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)));
+ return new SchemaAndValue(
+ schema,
+ convertToConnect(schema, jsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
+ );
}
public ObjectNode asJsonSchema(Schema schema) {
diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
index 7686fdb..d5bb24c 100644
--- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
+++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
@@ -172,10 +172,13 @@ public class JsonConverterTest {
assertEquals(new SchemaAndValue(expectedSchema, expected), converted);
}
- @Test(expected = DataException.class)
+ @Test
public void nullToConnect() {
- // When schemas are enabled, trying to decode a null should be an error -- we should *always* have the envelope
- assertEquals(SchemaAndValue.NULL, converter.toConnectData(TOPIC, null));
+ // When schemas are enabled, trying to decode a tombstone should be an empty envelope
+ // the behavior is the same as when the json is "{ "schema": null, "payload": null }"
+ // to keep compatibility with the record
+ SchemaAndValue converted = converter.toConnectData(TOPIC, null);
+ assertEquals(SchemaAndValue.NULL, converted);
}
@Test
@@ -696,6 +699,23 @@ public class JsonConverterTest {
);
}
+ @Test
+ public void nullSchemaAndNullValueToJson() {
+ // This characterizes the production of tombstone messages when Json schemas is enabled
+ Map<String, Boolean> props = Collections.singletonMap("schemas.enable", true);
+ converter.configure(props, true);
+ byte[] converted = converter.fromConnectData(TOPIC, null, null);
+ assertNull(converted);
+ }
+
+ @Test
+ public void nullValueToJson() {
+ // This characterizes the production of tombstone messages when Json schemas is not enabled
+ Map<String, Boolean> props = Collections.singletonMap("schemas.enable", false);
+ converter.configure(props, true);
+ byte[] converted = converter.fromConnectData(TOPIC, null, null);
+ assertNull(converted);
+ }
@Test(expected = DataException.class)
public void mismatchSchemaJson() {
@@ -703,8 +723,6 @@ public class JsonConverterTest {
converter.fromConnectData(TOPIC, Schema.FLOAT64_SCHEMA, true);
}
-
-
@Test
public void noSchemaToConnect() {
Map<String, Boolean> props = Collections.singletonMap("schemas.enable", false);