You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/03/26 19:07:48 UTC

[kafka] branch 2.4 updated: KAFKA-9707: Fix InsertField.Key should apply to keys of tombstone records (#8280)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new d9ecbc2  KAFKA-9707: Fix InsertField.Key should apply to keys of tombstone records (#8280)
d9ecbc2 is described below

commit d9ecbc229b6dff4cf31a26fdcb53dee1441f9939
Author: Greg Harris <gr...@confluent.io>
AuthorDate: Thu Mar 26 10:21:11 2020 -0700

    KAFKA-9707: Fix InsertField.Key should apply to keys of tombstone records (#8280)
    
    * KAFKA-9707: Fix InsertField.Key not applying to tombstone events
    
    * Fix typo that hardcoded .value() instead of abstract operatingValue
    * Add test for Key transform that was previously not tested
    
    Signed-off-by: Greg Harris <gr...@confluent.io>
    
    * Add null value assertion to tombstone test
    
    * Remove mis-named function and add test for passing-through a null-keyed record.
    
    Signed-off-by: Greg Harris <gr...@confluent.io>
    
    * Simplify unchanged record assertion
    
    Signed-off-by: Greg Harris <gr...@confluent.io>
    
    * Replace assertEquals with assertSame
    
    Signed-off-by: Greg Harris <gr...@confluent.io>
    
    * Fix checkstyleTest indent issue
    
    Signed-off-by: Greg Harris <gr...@confluent.io>
---
 .../kafka/connect/transforms/InsertField.java      |  6 +-
 .../kafka/connect/transforms/InsertFieldTest.java  | 70 ++++++++++++++++++----
 2 files changed, 58 insertions(+), 18 deletions(-)

diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
index 93ba79c..8f0c5aa 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
@@ -127,7 +127,7 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo
 
     @Override
     public R apply(R record) {
-        if (isTombstoneRecord(record)) {
+        if (operatingValue(record) == null) {
             return record;
         } else if (operatingSchema(record) == null) {
             return applySchemaless(record);
@@ -136,10 +136,6 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo
         }
     }
 
-    private boolean isTombstoneRecord(R record) {
-        return record.value() == null;
-    }
-
     private R applySchemaless(R record) {
         final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
 
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
index b22872c..dc6611c 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
@@ -33,17 +33,18 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertSame;
 
 public class InsertFieldTest {
-    private InsertField<SourceRecord> xform = new InsertField.Value<>();
+    private InsertField<SourceRecord> xformKey = new InsertField.Key<>();
+    private InsertField<SourceRecord> xformValue = new InsertField.Value<>();
 
     @After
     public void teardown() {
-        xform.close();
+        xformValue.close();
     }
 
     @Test(expected = DataException.class)
     public void topLevelStructRequired() {
-        xform.configure(Collections.singletonMap("topic.field", "topic_field"));
-        xform.apply(new SourceRecord(null, null, "", 0, Schema.INT32_SCHEMA, 42));
+        xformValue.configure(Collections.singletonMap("topic.field", "topic_field"));
+        xformValue.apply(new SourceRecord(null, null, "", 0, Schema.INT32_SCHEMA, 42));
     }
 
     @Test
@@ -55,13 +56,13 @@ public class InsertFieldTest {
         props.put("static.field", "instance_id");
         props.put("static.value", "my-instance-id");
 
-        xform.configure(props);
+        xformValue.configure(props);
 
         final Schema simpleStructSchema = SchemaBuilder.struct().name("name").version(1).doc("doc").field("magic", Schema.OPTIONAL_INT64_SCHEMA).build();
         final Struct simpleStruct = new Struct(simpleStructSchema).put("magic", 42L);
 
         final SourceRecord record = new SourceRecord(null, null, "test", 0, simpleStructSchema, simpleStruct);
-        final SourceRecord transformedRecord = xform.apply(record);
+        final SourceRecord transformedRecord = xformValue.apply(record);
 
         assertEquals(simpleStructSchema.name(), transformedRecord.valueSchema().name());
         assertEquals(simpleStructSchema.version(), transformedRecord.valueSchema().version());
@@ -83,7 +84,7 @@ public class InsertFieldTest {
         assertEquals("my-instance-id", ((Struct) transformedRecord.value()).getString("instance_id"));
 
         // Exercise caching
-        final SourceRecord transformedRecord2 = xform.apply(
+        final SourceRecord transformedRecord2 = xformValue.apply(
                 new SourceRecord(null, null, "test", 1, simpleStructSchema, new Struct(simpleStructSchema)));
         assertSame(transformedRecord.valueSchema(), transformedRecord2.valueSchema());
     }
@@ -97,12 +98,12 @@ public class InsertFieldTest {
         props.put("static.field", "instance_id");
         props.put("static.value", "my-instance-id");
 
-        xform.configure(props);
+        xformValue.configure(props);
 
         final SourceRecord record = new SourceRecord(null, null, "test", 0,
                 null, Collections.singletonMap("magic", 42L));
 
-        final SourceRecord transformedRecord = xform.apply(record);
+        final SourceRecord transformedRecord = xformValue.apply(record);
 
         assertEquals(42L, ((Map<?, ?>) transformedRecord.value()).get("magic"));
         assertEquals("test", ((Map<?, ?>) transformedRecord.value()).get("topic_field"));
@@ -121,12 +122,12 @@ public class InsertFieldTest {
         props.put("static.field", "instance_id");
         props.put("static.value", "my-instance-id");
 
-        xform.configure(props);
+        xformValue.configure(props);
 
         final SourceRecord record = new SourceRecord(null, null, "test", 0,
                 null, null);
 
-        final SourceRecord transformedRecord = xform.apply(record);
+        final SourceRecord transformedRecord = xformValue.apply(record);
 
         assertEquals(null, transformedRecord.value());
         assertEquals(null, transformedRecord.valueSchema());
@@ -141,16 +142,59 @@ public class InsertFieldTest {
         props.put("static.field", "instance_id");
         props.put("static.value", "my-instance-id");
 
-        xform.configure(props);
+        xformValue.configure(props);
 
         final Schema simpleStructSchema = SchemaBuilder.struct().name("name").version(1).doc("doc").field("magic", Schema.OPTIONAL_INT64_SCHEMA).build();
 
         final SourceRecord record = new SourceRecord(null, null, "test", 0,
                 simpleStructSchema, null);
 
-        final SourceRecord transformedRecord = xform.apply(record);
+        final SourceRecord transformedRecord = xformValue.apply(record);
 
         assertEquals(null, transformedRecord.value());
         assertEquals(simpleStructSchema, transformedRecord.valueSchema());
     }
+
+    @Test
+    public void insertKeyFieldsIntoTombstoneEvent() {
+        final Map<String, Object> props = new HashMap<>();
+        props.put("topic.field", "topic_field!");
+        props.put("partition.field", "partition_field");
+        props.put("timestamp.field", "timestamp_field?");
+        props.put("static.field", "instance_id");
+        props.put("static.value", "my-instance-id");
+
+        xformKey.configure(props);
+
+        final SourceRecord record = new SourceRecord(null, null, "test", 0,
+            null, Collections.singletonMap("magic", 42L), null, null);
+
+        final SourceRecord transformedRecord = xformKey.apply(record);
+
+        assertEquals(42L, ((Map<?, ?>) transformedRecord.key()).get("magic"));
+        assertEquals("test", ((Map<?, ?>) transformedRecord.key()).get("topic_field"));
+        assertEquals(0, ((Map<?, ?>) transformedRecord.key()).get("partition_field"));
+        assertEquals(null, ((Map<?, ?>) transformedRecord.key()).get("timestamp_field"));
+        assertEquals("my-instance-id", ((Map<?, ?>) transformedRecord.key()).get("instance_id"));
+        assertEquals(null, transformedRecord.value());
+    }
+
+    @Test
+    public void insertIntoNullKeyLeavesRecordUnchanged() {
+        final Map<String, Object> props = new HashMap<>();
+        props.put("topic.field", "topic_field!");
+        props.put("partition.field", "partition_field");
+        props.put("timestamp.field", "timestamp_field?");
+        props.put("static.field", "instance_id");
+        props.put("static.value", "my-instance-id");
+
+        xformKey.configure(props);
+
+        final SourceRecord record = new SourceRecord(null, null, "test", 0,
+            null, null, null, Collections.singletonMap("magic", 42L));
+
+        final SourceRecord transformedRecord = xformKey.apply(record);
+
+        assertSame(record, transformedRecord);
+    }
 }