You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/03/26 19:08:37 UTC
[kafka] branch 2.2 updated: KAFKA-9707: Fix InsertField.Key should
apply to keys of tombstone records (#8280)
This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push:
new 5f3ba14 KAFKA-9707: Fix InsertField.Key should apply to keys of tombstone records (#8280)
5f3ba14 is described below
commit 5f3ba14e5c001ebb5189ea3764ba3929a04f6160
Author: Greg Harris <gr...@confluent.io>
AuthorDate: Thu Mar 26 10:21:11 2020 -0700
KAFKA-9707: Fix InsertField.Key should apply to keys of tombstone records (#8280)
* KAFKA-9707: Fix InsertField.Key not applying to tombstone events
* Fix typo that hardcoded .value() instead of abstract operatingValue
* Add test for Key transform that was previously not tested
Signed-off-by: Greg Harris <gr...@confluent.io>
* Add null value assertion to tombstone test
* Remove mis-named function and add test for passing-through a null-keyed record.
Signed-off-by: Greg Harris <gr...@confluent.io>
* Simplify unchanged record assertion
Signed-off-by: Greg Harris <gr...@confluent.io>
* Replace assertEquals with assertSame
Signed-off-by: Greg Harris <gr...@confluent.io>
* Fix checkstyleTest indent issue
Signed-off-by: Greg Harris <gr...@confluent.io>
---
.../kafka/connect/transforms/InsertField.java | 6 +-
.../kafka/connect/transforms/InsertFieldTest.java | 70 ++++++++++++++++++----
2 files changed, 58 insertions(+), 18 deletions(-)
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
index 93ba79c..8f0c5aa 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
@@ -127,7 +127,7 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo
@Override
public R apply(R record) {
- if (isTombstoneRecord(record)) {
+ if (operatingValue(record) == null) {
return record;
} else if (operatingSchema(record) == null) {
return applySchemaless(record);
@@ -136,10 +136,6 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo
}
}
- private boolean isTombstoneRecord(R record) {
- return record.value() == null;
- }
-
private R applySchemaless(R record) {
final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
index b22872c..dc6611c 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
@@ -33,17 +33,18 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
public class InsertFieldTest {
- private InsertField<SourceRecord> xform = new InsertField.Value<>();
+ private InsertField<SourceRecord> xformKey = new InsertField.Key<>();
+ private InsertField<SourceRecord> xformValue = new InsertField.Value<>();
@After
public void teardown() {
- xform.close();
+ xformValue.close();
}
@Test(expected = DataException.class)
public void topLevelStructRequired() {
- xform.configure(Collections.singletonMap("topic.field", "topic_field"));
- xform.apply(new SourceRecord(null, null, "", 0, Schema.INT32_SCHEMA, 42));
+ xformValue.configure(Collections.singletonMap("topic.field", "topic_field"));
+ xformValue.apply(new SourceRecord(null, null, "", 0, Schema.INT32_SCHEMA, 42));
}
@Test
@@ -55,13 +56,13 @@ public class InsertFieldTest {
props.put("static.field", "instance_id");
props.put("static.value", "my-instance-id");
- xform.configure(props);
+ xformValue.configure(props);
final Schema simpleStructSchema = SchemaBuilder.struct().name("name").version(1).doc("doc").field("magic", Schema.OPTIONAL_INT64_SCHEMA).build();
final Struct simpleStruct = new Struct(simpleStructSchema).put("magic", 42L);
final SourceRecord record = new SourceRecord(null, null, "test", 0, simpleStructSchema, simpleStruct);
- final SourceRecord transformedRecord = xform.apply(record);
+ final SourceRecord transformedRecord = xformValue.apply(record);
assertEquals(simpleStructSchema.name(), transformedRecord.valueSchema().name());
assertEquals(simpleStructSchema.version(), transformedRecord.valueSchema().version());
@@ -83,7 +84,7 @@ public class InsertFieldTest {
assertEquals("my-instance-id", ((Struct) transformedRecord.value()).getString("instance_id"));
// Exercise caching
- final SourceRecord transformedRecord2 = xform.apply(
+ final SourceRecord transformedRecord2 = xformValue.apply(
new SourceRecord(null, null, "test", 1, simpleStructSchema, new Struct(simpleStructSchema)));
assertSame(transformedRecord.valueSchema(), transformedRecord2.valueSchema());
}
@@ -97,12 +98,12 @@ public class InsertFieldTest {
props.put("static.field", "instance_id");
props.put("static.value", "my-instance-id");
- xform.configure(props);
+ xformValue.configure(props);
final SourceRecord record = new SourceRecord(null, null, "test", 0,
null, Collections.singletonMap("magic", 42L));
- final SourceRecord transformedRecord = xform.apply(record);
+ final SourceRecord transformedRecord = xformValue.apply(record);
assertEquals(42L, ((Map<?, ?>) transformedRecord.value()).get("magic"));
assertEquals("test", ((Map<?, ?>) transformedRecord.value()).get("topic_field"));
@@ -121,12 +122,12 @@ public class InsertFieldTest {
props.put("static.field", "instance_id");
props.put("static.value", "my-instance-id");
- xform.configure(props);
+ xformValue.configure(props);
final SourceRecord record = new SourceRecord(null, null, "test", 0,
null, null);
- final SourceRecord transformedRecord = xform.apply(record);
+ final SourceRecord transformedRecord = xformValue.apply(record);
assertEquals(null, transformedRecord.value());
assertEquals(null, transformedRecord.valueSchema());
@@ -141,16 +142,59 @@ public class InsertFieldTest {
props.put("static.field", "instance_id");
props.put("static.value", "my-instance-id");
- xform.configure(props);
+ xformValue.configure(props);
final Schema simpleStructSchema = SchemaBuilder.struct().name("name").version(1).doc("doc").field("magic", Schema.OPTIONAL_INT64_SCHEMA).build();
final SourceRecord record = new SourceRecord(null, null, "test", 0,
simpleStructSchema, null);
- final SourceRecord transformedRecord = xform.apply(record);
+ final SourceRecord transformedRecord = xformValue.apply(record);
assertEquals(null, transformedRecord.value());
assertEquals(simpleStructSchema, transformedRecord.valueSchema());
}
+
+ @Test
+ public void insertKeyFieldsIntoTombstoneEvent() {
+ final Map<String, Object> props = new HashMap<>();
+ props.put("topic.field", "topic_field!");
+ props.put("partition.field", "partition_field");
+ props.put("timestamp.field", "timestamp_field?");
+ props.put("static.field", "instance_id");
+ props.put("static.value", "my-instance-id");
+
+ xformKey.configure(props);
+
+ final SourceRecord record = new SourceRecord(null, null, "test", 0,
+ null, Collections.singletonMap("magic", 42L), null, null);
+
+ final SourceRecord transformedRecord = xformKey.apply(record);
+
+ assertEquals(42L, ((Map<?, ?>) transformedRecord.key()).get("magic"));
+ assertEquals("test", ((Map<?, ?>) transformedRecord.key()).get("topic_field"));
+ assertEquals(0, ((Map<?, ?>) transformedRecord.key()).get("partition_field"));
+ assertEquals(null, ((Map<?, ?>) transformedRecord.key()).get("timestamp_field"));
+ assertEquals("my-instance-id", ((Map<?, ?>) transformedRecord.key()).get("instance_id"));
+ assertEquals(null, transformedRecord.value());
+ }
+
+ @Test
+ public void insertIntoNullKeyLeavesRecordUnchanged() {
+ final Map<String, Object> props = new HashMap<>();
+ props.put("topic.field", "topic_field!");
+ props.put("partition.field", "partition_field");
+ props.put("timestamp.field", "timestamp_field?");
+ props.put("static.field", "instance_id");
+ props.put("static.value", "my-instance-id");
+
+ xformKey.configure(props);
+
+ final SourceRecord record = new SourceRecord(null, null, "test", 0,
+ null, null, null, Collections.singletonMap("magic", 42L));
+
+ final SourceRecord transformedRecord = xformKey.apply(record);
+
+ assertSame(record, transformedRecord);
+ }
}