You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/02/12 22:19:54 UTC
[kafka] branch 2.2 updated: KAFKA-9192: fix NPE when for converting
optional json schema in structs (#7733)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push:
new 8d839e7 KAFKA-9192: fix NPE when for converting optional json schema in structs (#7733)
8d839e7 is described below
commit 8d839e7d8e1545985b7513f7b2e6b5735a1617ac
Author: Lev Zemlyanov <lz...@purdue.edu>
AuthorDate: Wed Feb 12 13:44:23 2020 -0800
KAFKA-9192: fix NPE when for converting optional json schema in structs (#7733)
Author: Lev Zemlyanov <le...@confluent.io>
Reviewers: Greg Harris <gr...@confluent.io>, Randall Hauch <rh...@gmail.com>
---
.../main/java/org/apache/kafka/connect/json/JsonConverter.java | 2 +-
.../java/org/apache/kafka/connect/json/JsonConverterTest.java | 9 +++++++++
2 files changed, 10 insertions(+), 1 deletion(-)
diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index 546fcf0..55fe09d 100644
--- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -698,7 +698,7 @@ public class JsonConverter implements Converter, HeaderConverter {
final Schema.Type schemaType;
if (schema != null) {
schemaType = schema.type();
- if (jsonValue.isNull()) {
+ if (jsonValue == null || jsonValue.isNull()) {
if (schema.defaultValue() != null)
return schema.defaultValue(); // any logical type conversions should already have been applied
if (schema.isOptional())
diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
index 003d7e6..f9553b7 100644
--- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
+++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
@@ -173,6 +173,15 @@ public class JsonConverterTest {
}
@Test
+ public void structWithOptionalFieldToConnect() {
+ byte[] structJson = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"field\":\"optional\", \"type\": \"string\", \"optional\": true }, { \"field\": \"required\", \"type\": \"string\" }] }, \"payload\": { \"required\": \"required\" } }".getBytes();
+ Schema expectedSchema = SchemaBuilder.struct().field("optional", Schema.OPTIONAL_STRING_SCHEMA).field("required", Schema.STRING_SCHEMA).build();
+ Struct expected = new Struct(expectedSchema).put("required", "required");
+ SchemaAndValue converted = converter.toConnectData(TOPIC, structJson);
+ assertEquals(new SchemaAndValue(expectedSchema, expected), converted);
+ }
+
+ @Test
public void nullToConnect() {
// When schemas are enabled, trying to decode a tombstone should be an empty envelope
// the behavior is the same as when the json is "{ "schema": null, "payload": null }"