You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2021/02/18 15:03:21 UTC
[kafka] branch trunk updated: KAFKA-12303: Fix handling of null
values by Flatten SMT (#10073)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b58b944 KAFKA-12303: Fix handling of null values by Flatten SMT (#10073)
b58b944 is described below
commit b58b944e70780029806a6004114bba43ce5b9483
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Thu Feb 18 10:01:49 2021 -0500
KAFKA-12303: Fix handling of null values by Flatten SMT (#10073)
Reviewers: Mickael Maison <mi...@gmail.com>, Greg Harris <gr...@confluent.io>
---
.../apache/kafka/connect/transforms/Flatten.java | 2 +-
.../kafka/connect/transforms/FlattenTest.java | 44 ++++++++++++++++++++++
2 files changed, 45 insertions(+), 1 deletion(-)
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
index cad8d79..782cb3a 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
@@ -106,7 +106,7 @@ public abstract class Flatten<R extends ConnectRecord<R>> implements Transformat
Object value = entry.getValue();
if (value == null) {
newRecord.put(fieldName(fieldNamePrefix, entry.getKey()), null);
- return;
+ continue;
}
Schema.Type inferredType = ConnectSchema.schemaType(value.getClass());
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
index 541ca14..af311bc 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -326,4 +327,47 @@ public class FlattenTest {
assertNull(transformedRecord.value());
assertEquals(simpleStructSchema, transformedRecord.valueSchema());
}
+
+ @Test
+ public void testMapWithNullFields() {
+ xformValue.configure(Collections.emptyMap());
+
+ // Use a LinkedHashMap to ensure the SMT sees entries in a specific order
+ Map<String, Object> value = new LinkedHashMap<>();
+ value.put("firstNull", null);
+ value.put("firstNonNull", "nonNull");
+ value.put("secondNull", null);
+ value.put("secondNonNull", "alsoNonNull");
+ value.put("thirdNonNull", null);
+
+ final SourceRecord record = new SourceRecord(null, null, "test", 0, null, value);
+ final SourceRecord transformedRecord = xformValue.apply(record);
+
+ assertEquals(value, transformedRecord.value());
+ }
+
+ @Test
+ public void testStructWithNullFields() {
+ xformValue.configure(Collections.emptyMap());
+
+ final Schema structSchema = SchemaBuilder.struct()
+ .field("firstNull", Schema.OPTIONAL_STRING_SCHEMA)
+ .field("firstNonNull", Schema.OPTIONAL_STRING_SCHEMA)
+ .field("secondNull", Schema.OPTIONAL_STRING_SCHEMA)
+ .field("secondNonNull", Schema.OPTIONAL_STRING_SCHEMA)
+ .field("thirdNonNull", Schema.OPTIONAL_STRING_SCHEMA)
+ .build();
+
+ final Struct value = new Struct(structSchema);
+ value.put("firstNull", null);
+ value.put("firstNonNull", "nonNull");
+ value.put("secondNull", null);
+ value.put("secondNonNull", "alsoNonNull");
+ value.put("thirdNonNull", null);
+
+ final SourceRecord record = new SourceRecord(null, null, "test", 0, structSchema, value);
+ final SourceRecord transformedRecord = xformValue.apply(record);
+
+ assertEquals(value, transformedRecord.value());
+ }
}