You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/02/12 22:19:54 UTC

[kafka] branch 2.2 updated: KAFKA-9192: fix NPE when for converting optional json schema in structs (#7733)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 8d839e7  KAFKA-9192: fix NPE when for converting optional json schema in structs (#7733)
8d839e7 is described below

commit 8d839e7d8e1545985b7513f7b2e6b5735a1617ac
Author: Lev Zemlyanov <lz...@purdue.edu>
AuthorDate: Wed Feb 12 13:44:23 2020 -0800

    KAFKA-9192: fix NPE when for converting optional json schema in structs (#7733)
    
    Author: Lev Zemlyanov <le...@confluent.io>
    Reviewers: Greg Harris <gr...@confluent.io>, Randall Hauch <rh...@gmail.com>
---
 .../main/java/org/apache/kafka/connect/json/JsonConverter.java   | 2 +-
 .../java/org/apache/kafka/connect/json/JsonConverterTest.java    | 9 +++++++++
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index 546fcf0..55fe09d 100644
--- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -698,7 +698,7 @@ public class JsonConverter implements Converter, HeaderConverter {
         final Schema.Type schemaType;
         if (schema != null) {
             schemaType = schema.type();
-            if (jsonValue.isNull()) {
+            if (jsonValue == null || jsonValue.isNull()) {
                 if (schema.defaultValue() != null)
                     return schema.defaultValue(); // any logical type conversions should already have been applied
                 if (schema.isOptional())
diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
index 003d7e6..f9553b7 100644
--- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
+++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
@@ -173,6 +173,15 @@ public class JsonConverterTest {
     }
 
     @Test
+    public void structWithOptionalFieldToConnect() {
+        byte[] structJson = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"field\":\"optional\", \"type\": \"string\", \"optional\": true }, {  \"field\": \"required\", \"type\": \"string\" }] }, \"payload\": { \"required\": \"required\" } }".getBytes();
+        Schema expectedSchema = SchemaBuilder.struct().field("optional", Schema.OPTIONAL_STRING_SCHEMA).field("required", Schema.STRING_SCHEMA).build();
+        Struct expected = new Struct(expectedSchema).put("required", "required");
+        SchemaAndValue converted = converter.toConnectData(TOPIC, structJson);
+        assertEquals(new SchemaAndValue(expectedSchema, expected), converted);
+    }
+
+    @Test
     public void nullToConnect() {
         // When schemas are enabled, trying to decode a tombstone should be an empty envelope
         // the behavior is the same as when the json is "{ "schema": null, "payload": null }"