You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/03/30 22:42:20 UTC

[kafka] branch 2.5 updated: KAFKA-9706: Handle null in keys or values when Flatten transformation is used (#8279)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new f534da0  KAFKA-9706: Handle null in keys or values when Flatten transformation is used (#8279)
f534da0 is described below

commit f534da078eaf3a3b0b5471522b22b99acc9e4837
Author: Greg Harris <gr...@confluent.io>
AuthorDate: Mon Mar 30 15:09:27 2020 -0700

    KAFKA-9706: Handle null in keys or values when Flatten transformation is used (#8279)
    
    * Fixed DataException thrown when handling tombstone events with null value
    * Passes through original record when finding a null key when it's configured for keys or a null value when it's configured for values.
    * Added unit tests for schema and schemaless data
---
 .../apache/kafka/connect/transforms/Flatten.java   |  4 +++-
 .../kafka/connect/transforms/FlattenTest.java      | 25 ++++++++++++++++++++++
 2 files changed, 28 insertions(+), 1 deletion(-)

diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
index 1a4be33..cad8d79 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
@@ -69,7 +69,9 @@ public abstract class Flatten<R extends ConnectRecord<R>> implements Transformat
 
     @Override
     public R apply(R record) {
-        if (operatingSchema(record) == null) {
+        if (operatingValue(record) == null) {
+            return record;
+        } else if (operatingSchema(record) == null) {
             return applySchemaless(record);
         } else {
             return applyWithSchema(record);
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
index 2e7be95..d044338 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
@@ -297,4 +297,29 @@ public class FlattenTest {
         Schema transformedOptFieldSchema = SchemaBuilder.string().optional().defaultValue("child_default").build();
         assertEquals(transformedOptFieldSchema, transformedSchema.field("opt_field").schema());
     }
+
+    @Test
+    public void tombstoneEventWithoutSchemaShouldPassThrough() {
+        xformValue.configure(Collections.<String, String>emptyMap());
+
+        final SourceRecord record = new SourceRecord(null, null, "test", 0,
+                null, null);
+        final SourceRecord transformedRecord = xformValue.apply(record);
+
+        assertEquals(null, transformedRecord.value());
+        assertEquals(null, transformedRecord.valueSchema());
+    }
+
+    @Test
+    public void tombstoneEventWithSchemaShouldPassThrough() {
+        xformValue.configure(Collections.<String, String>emptyMap());
+
+        final Schema simpleStructSchema = SchemaBuilder.struct().name("name").version(1).doc("doc").field("magic", Schema.OPTIONAL_INT64_SCHEMA).build();
+        final SourceRecord record = new SourceRecord(null, null, "test", 0,
+                simpleStructSchema, null);
+        final SourceRecord transformedRecord = xformValue.apply(record);
+
+        assertEquals(null, transformedRecord.value());
+        assertEquals(simpleStructSchema, transformedRecord.valueSchema());
+    }
 }