You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2022/03/01 17:58:02 UTC

[kafka] branch 3.0 updated: KAFKA-8659: fix SetSchemaMetadata failing on null value and schema (#7082)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new caf7757f KAFKA-8659: fix SetSchemaMetadata failing on null value and schema (#7082)
caf7757f is described below

commit caf7757fd4d39f1e50ae4ba3b560408dadfd6a2d
Author: Marc Löhe <bf...@users.noreply.github.com>
AuthorDate: Tue Mar 1 16:10:43 2022 +0100

    KAFKA-8659: fix SetSchemaMetadata failing on null value and schema (#7082)
    
    Make SetSchemaMetadata SMT ignore records with null value and valueSchema or key and keySchema.
    
    The transform has been unit tested for handling null values gracefully while still providing the necessary validation for non-null values.
    
    Reviewers: Konstantine Karantasis<ko...@confluent.io>, Bill Bejeck <bb...@apache.org>
---
 .../kafka/connect/transforms/SetSchemaMetadata.java  | 16 ++++++++++++++++
 .../connect/transforms/SetSchemaMetadataTest.java    | 20 ++++++++++++++++++++
 2 files changed, 36 insertions(+)

diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
index fd3cbf3..c83ff64 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
@@ -63,7 +63,11 @@ public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements T
 
     @Override
     public R apply(R record) {
+        final Object value = operatingValue(record);
         final Schema schema = operatingSchema(record);
+        if (value == null && schema == null) {
+            return record;
+        }
         requireSchema(schema, "updating schema metadata");
         final boolean isArray = schema.type() == Schema.Type.ARRAY;
         final boolean isMap = schema.type() == Schema.Type.MAP;
@@ -95,6 +99,8 @@ public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements T
 
     protected abstract Schema operatingSchema(R record);
 
+    protected abstract Object operatingValue(R record);
+
     protected abstract R newRecord(R record, Schema updatedSchema);
 
     /**
@@ -107,6 +113,11 @@ public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements T
         }
 
         @Override
+        protected Object operatingValue(R record) {
+            return record.key();
+        }
+
+        @Override
         protected R newRecord(R record, Schema updatedSchema) {
             Object updatedKey = updateSchemaIn(record.key(), updatedSchema);
             return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedKey, record.valueSchema(), record.value(), record.timestamp());
@@ -123,6 +134,11 @@ public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements T
         }
 
         @Override
+        protected Object operatingValue(R record) {
+            return record.value();
+        }
+
+        @Override
         protected R newRecord(R record, Schema updatedSchema) {
             Object updatedValue = updateSchemaIn(record.value(), updatedSchema);
             return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp());
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
index 04a35ca..74ac308 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
@@ -31,6 +32,7 @@ import java.util.Map;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class SetSchemaMetadataTest {
     private final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>();
@@ -102,6 +104,24 @@ public class SetSchemaMetadataTest {
     }
 
     @Test
+    public void valueSchemaRequired() {
+        final SinkRecord record = new SinkRecord("", 0, null, null, null, 42, 0);
+        assertThrows(DataException.class, () -> xform.apply(record));
+    }
+
+    @Test
+    public void ignoreRecordWithNullValue() {
+        final SinkRecord record = new SinkRecord("", 0, null, null, null, null, 0);
+
+        final SinkRecord updatedRecord = xform.apply(record);
+
+        assertNull(updatedRecord.key());
+        assertNull(updatedRecord.keySchema());
+        assertNull(updatedRecord.value());
+        assertNull(updatedRecord.valueSchema());
+    }
+
+    @Test
     public void updateSchemaOfStruct() {
         final String fieldName1 = "f1";
         final String fieldName2 = "f2";