You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2021/02/18 15:03:21 UTC

[kafka] branch trunk updated: KAFKA-12303: Fix handling of null values by Flatten SMT (#10073)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b58b944  KAFKA-12303: Fix handling of null values by Flatten SMT (#10073)
b58b944 is described below

commit b58b944e70780029806a6004114bba43ce5b9483
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Thu Feb 18 10:01:49 2021 -0500

    KAFKA-12303: Fix handling of null values by Flatten SMT (#10073)
    
    
    Reviewers: Mickael Maison <mi...@gmail.com>, Greg Harris <gr...@confluent.io>
---
 .../apache/kafka/connect/transforms/Flatten.java   |  2 +-
 .../kafka/connect/transforms/FlattenTest.java      | 44 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 1 deletion(-)

diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
index cad8d79..782cb3a 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
@@ -106,7 +106,7 @@ public abstract class Flatten<R extends ConnectRecord<R>> implements Transformat
             Object value = entry.getValue();
             if (value == null) {
                 newRecord.put(fieldName(fieldNamePrefix, entry.getKey()), null);
-                return;
+                continue;
             }
 
             Schema.Type inferredType = ConnectSchema.schemaType(value.getClass());
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
index 541ca14..af311bc 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -326,4 +327,47 @@ public class FlattenTest {
         assertNull(transformedRecord.value());
         assertEquals(simpleStructSchema, transformedRecord.valueSchema());
     }
+
+    @Test
+    public void testMapWithNullFields() {
+        xformValue.configure(Collections.emptyMap());
+
+        // Use a LinkedHashMap to ensure the SMT sees entries in a specific order
+        Map<String, Object> value = new LinkedHashMap<>();
+        value.put("firstNull", null);
+        value.put("firstNonNull", "nonNull");
+        value.put("secondNull", null);
+        value.put("secondNonNull", "alsoNonNull");
+        value.put("thirdNonNull", null);
+
+        final SourceRecord record = new SourceRecord(null, null, "test", 0, null, value);
+        final SourceRecord transformedRecord = xformValue.apply(record);
+
+        assertEquals(value, transformedRecord.value());
+    }
+
+    @Test
+    public void testStructWithNullFields() {
+        xformValue.configure(Collections.emptyMap());
+
+        final Schema structSchema = SchemaBuilder.struct()
+            .field("firstNull", Schema.OPTIONAL_STRING_SCHEMA)
+            .field("firstNonNull", Schema.OPTIONAL_STRING_SCHEMA)
+            .field("secondNull", Schema.OPTIONAL_STRING_SCHEMA)
+            .field("secondNonNull", Schema.OPTIONAL_STRING_SCHEMA)
+            .field("thirdNonNull", Schema.OPTIONAL_STRING_SCHEMA)
+            .build();
+
+        final Struct value = new Struct(structSchema);
+        value.put("firstNull", null);
+        value.put("firstNonNull", "nonNull");
+        value.put("secondNull", null);
+        value.put("secondNonNull", "alsoNonNull");
+        value.put("thirdNonNull", null);
+
+        final SourceRecord record = new SourceRecord(null, null, "test", 0, structSchema, value);
+        final SourceRecord transformedRecord = xformValue.apply(record);
+
+        assertEquals(value, transformedRecord.value());
+    }
 }