You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/03/30 22:42:20 UTC
[kafka] branch 2.5 updated: KAFKA-9706: Handle null in keys or
values when Flatten transformation is used (#8279)
This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new f534da0 KAFKA-9706: Handle null in keys or values when Flatten transformation is used (#8279)
f534da0 is described below
commit f534da078eaf3a3b0b5471522b22b99acc9e4837
Author: Greg Harris <gr...@confluent.io>
AuthorDate: Mon Mar 30 15:09:27 2020 -0700
KAFKA-9706: Handle null in keys or values when Flatten transformation is used (#8279)
* Fixed DataException thrown when handling tombstone events with null value
* Passes through original record when finding a null key when it's configured for keys or a null value when it's configured for values.
* Added unit tests for schema and schemaless data
---
.../apache/kafka/connect/transforms/Flatten.java | 4 +++-
.../kafka/connect/transforms/FlattenTest.java | 25 ++++++++++++++++++++++
2 files changed, 28 insertions(+), 1 deletion(-)
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
index 1a4be33..cad8d79 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
@@ -69,7 +69,9 @@ public abstract class Flatten<R extends ConnectRecord<R>> implements Transformat
@Override
public R apply(R record) {
- if (operatingSchema(record) == null) {
+ if (operatingValue(record) == null) {
+ return record;
+ } else if (operatingSchema(record) == null) {
return applySchemaless(record);
} else {
return applyWithSchema(record);
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
index 2e7be95..d044338 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
@@ -297,4 +297,29 @@ public class FlattenTest {
Schema transformedOptFieldSchema = SchemaBuilder.string().optional().defaultValue("child_default").build();
assertEquals(transformedOptFieldSchema, transformedSchema.field("opt_field").schema());
}
+
+ @Test
+ public void tombstoneEventWithoutSchemaShouldPassThrough() {
+ xformValue.configure(Collections.<String, String>emptyMap());
+
+ final SourceRecord record = new SourceRecord(null, null, "test", 0,
+ null, null);
+ final SourceRecord transformedRecord = xformValue.apply(record);
+
+ assertEquals(null, transformedRecord.value());
+ assertEquals(null, transformedRecord.valueSchema());
+ }
+
+ @Test
+ public void tombstoneEventWithSchemaShouldPassThrough() {
+ xformValue.configure(Collections.<String, String>emptyMap());
+
+ final Schema simpleStructSchema = SchemaBuilder.struct().name("name").version(1).doc("doc").field("magic", Schema.OPTIONAL_INT64_SCHEMA).build();
+ final SourceRecord record = new SourceRecord(null, null, "test", 0,
+ simpleStructSchema, null);
+ final SourceRecord transformedRecord = xformValue.apply(record);
+
+ assertEquals(null, transformedRecord.value());
+ assertEquals(simpleStructSchema, transformedRecord.valueSchema());
+ }
}