You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/02/12 23:08:43 UTC
[kafka] branch 2.4 updated: allow ReplaceField SMT to handle
tombstone records (#7731)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new b2e4fa3 allow ReplaceField SMT to handle tombstone records (#7731)
b2e4fa3 is described below
commit b2e4fa3320cf460b55e345c5ba1ee7c563411ce5
Author: Lev Zemlyanov <lz...@purdue.edu>
AuthorDate: Wed Feb 12 14:36:06 2020 -0800
allow ReplaceField SMT to handle tombstone records (#7731)
Signed-off-by: Lev Zemlyanov <le...@confluent.io>
---
.../kafka/connect/transforms/ReplaceField.java | 4 ++-
.../kafka/connect/transforms/ReplaceFieldTest.java | 38 ++++++++++++++++++++++
2 files changed, 41 insertions(+), 1 deletion(-)
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
index f071bda..3d9abc2 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
@@ -123,7 +123,9 @@ public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transf
@Override
public R apply(R record) {
- if (operatingSchema(record) == null) {
+ if (operatingValue(record) == null) {
+ return record;
+ } else if (operatingSchema(record) == null) {
return applySchemaless(record);
} else {
return applyWithSchema(record);
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
index 6a1a13a..7ab00ed 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
public class ReplaceFieldTest {
private ReplaceField<SinkRecord> xform = new ReplaceField.Value<>();
@@ -37,6 +38,43 @@ public class ReplaceFieldTest {
}
@Test
+ public void tombstoneSchemaless() {
+ final Map<String, String> props = new HashMap<>();
+ props.put("whitelist", "abc,foo");
+ props.put("renames", "abc:xyz,foo:bar");
+
+ xform.configure(props);
+
+ final SinkRecord record = new SinkRecord("test", 0, null, null, null, null, 0);
+ final SinkRecord transformedRecord = xform.apply(record);
+
+ assertNull(transformedRecord.value());
+ assertNull(transformedRecord.valueSchema());
+ }
+
+ @Test
+ public void tombstoneWithSchema() {
+ final Map<String, String> props = new HashMap<>();
+ props.put("whitelist", "abc,foo");
+ props.put("renames", "abc:xyz,foo:bar");
+
+ xform.configure(props);
+
+ final Schema schema = SchemaBuilder.struct()
+ .field("dont", Schema.STRING_SCHEMA)
+ .field("abc", Schema.INT32_SCHEMA)
+ .field("foo", Schema.BOOLEAN_SCHEMA)
+ .field("etc", Schema.STRING_SCHEMA)
+ .build();
+
+ final SinkRecord record = new SinkRecord("test", 0, null, null, schema, null, 0);
+ final SinkRecord transformedRecord = xform.apply(record);
+
+ assertNull(transformedRecord.value());
+ assertEquals(schema, transformedRecord.valueSchema());
+ }
+
+ @Test
public void schemaless() {
final Map<String, String> props = new HashMap<>();
props.put("blacklist", "dont");