You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/02/12 23:08:43 UTC

[kafka] branch 2.4 updated: allow ReplaceField SMT to handle tombstone records (#7731)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new b2e4fa3  allow ReplaceField SMT to handle tombstone records (#7731)
b2e4fa3 is described below

commit b2e4fa3320cf460b55e345c5ba1ee7c563411ce5
Author: Lev Zemlyanov <lz...@purdue.edu>
AuthorDate: Wed Feb 12 14:36:06 2020 -0800

    allow ReplaceField SMT to handle tombstone records (#7731)
    
    Signed-off-by: Lev Zemlyanov <le...@confluent.io>
---
 .../kafka/connect/transforms/ReplaceField.java     |  4 ++-
 .../kafka/connect/transforms/ReplaceFieldTest.java | 38 ++++++++++++++++++++++
 2 files changed, 41 insertions(+), 1 deletion(-)

diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
index f071bda..3d9abc2 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
@@ -123,7 +123,9 @@ public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transf
 
     @Override
     public R apply(R record) {
-        if (operatingSchema(record) == null) {
+        if (operatingValue(record) == null) {
+            return record;
+        } else if (operatingSchema(record) == null) {
             return applySchemaless(record);
         } else {
             return applyWithSchema(record);
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
index 6a1a13a..7ab00ed 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 public class ReplaceFieldTest {
     private ReplaceField<SinkRecord> xform = new ReplaceField.Value<>();
@@ -37,6 +38,43 @@ public class ReplaceFieldTest {
     }
 
     @Test
+    public void tombstoneSchemaless() {
+        final Map<String, String> props = new HashMap<>();
+        props.put("whitelist", "abc,foo");
+        props.put("renames", "abc:xyz,foo:bar");
+
+        xform.configure(props);
+
+        final SinkRecord record = new SinkRecord("test", 0, null, null, null, null, 0);
+        final SinkRecord transformedRecord = xform.apply(record);
+
+        assertNull(transformedRecord.value());
+        assertNull(transformedRecord.valueSchema());
+    }
+
+    @Test
+    public void tombstoneWithSchema() {
+        final Map<String, String> props = new HashMap<>();
+        props.put("whitelist", "abc,foo");
+        props.put("renames", "abc:xyz,foo:bar");
+
+        xform.configure(props);
+
+        final Schema schema = SchemaBuilder.struct()
+            .field("dont", Schema.STRING_SCHEMA)
+            .field("abc", Schema.INT32_SCHEMA)
+            .field("foo", Schema.BOOLEAN_SCHEMA)
+            .field("etc", Schema.STRING_SCHEMA)
+            .build();
+
+        final SinkRecord record = new SinkRecord("test", 0, null, null, schema, null, 0);
+        final SinkRecord transformedRecord = xform.apply(record);
+
+        assertNull(transformedRecord.value());
+        assertEquals(schema, transformedRecord.valueSchema());
+    }
+
+    @Test
     public void schemaless() {
         final Map<String, String> props = new HashMap<>();
         props.put("blacklist", "dont");