You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/02/11 20:15:05 UTC

[kafka] branch 2.3 updated: KAFKA-7052 Avoiding NPE in ExtractField SMT in case of non-existent fields (#8059)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 41e32fa  KAFKA-7052 Avoiding NPE in ExtractField SMT in case of non-existent fields (#8059)
41e32fa is described below

commit 41e32fa9c187d1e8fd0072fd01f977210904b038
Author: Gunnar Morling <gu...@googlemail.com>
AuthorDate: Tue Feb 11 20:48:22 2020 +0100

    KAFKA-7052 Avoiding NPE in ExtractField SMT in case of non-existent fields (#8059)
    
    Author: Gunnar Morling <gu...@googlemail.com>
    Reviewer: Randall Hauch <rh...@gmail.com>
---
 .../kafka/connect/transforms/ExtractField.java     |  9 +++++++-
 .../kafka/connect/transforms/ExtractFieldTest.java | 27 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 1 deletion(-)

diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
index eb8c357..bd3cbd9 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.transforms;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.transforms.util.SimpleConfig;
@@ -58,7 +59,13 @@ public abstract class ExtractField<R extends ConnectRecord<R>> implements Transf
             return newRecord(record, null, value == null ? null : value.get(fieldName));
         } else {
             final Struct value = requireStructOrNull(operatingValue(record), PURPOSE);
-            return newRecord(record, schema.field(fieldName).schema(), value == null ? null : value.get(fieldName));
+            Field field = schema.field(fieldName);
+
+            if (field == null) {
+                throw new IllegalArgumentException("Unknown field: " + fieldName);
+            }
+
+            return newRecord(record, field.schema(), value == null ? null : value.get(fieldName));
         }
     }
 
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
index acb0beb..a78c6d5 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
@@ -28,6 +28,7 @@ import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
 
 public class ExtractFieldTest {
     private final ExtractField<SinkRecord> xform = new ExtractField.Key<>();
@@ -86,4 +87,30 @@ public class ExtractFieldTest {
         assertNull(transformedRecord.key());
     }
 
+    @Test
+    public void nonExistentFieldSchemalessShouldReturnNull() {
+        xform.configure(Collections.singletonMap("field", "nonexistent"));
+
+        final SinkRecord record = new SinkRecord("test", 0, null, Collections.singletonMap("magic", 42), null, null, 0);
+        final SinkRecord transformedRecord = xform.apply(record);
+
+        assertNull(transformedRecord.keySchema());
+        assertNull(transformedRecord.key());
+    }
+
+    @Test
+    public void nonExistentFieldWithSchemaShouldFail() {
+        xform.configure(Collections.singletonMap("field", "nonexistent"));
+
+        final Schema keySchema = SchemaBuilder.struct().field("magic", Schema.INT32_SCHEMA).build();
+        final Struct key = new Struct(keySchema).put("magic", 42);
+        final SinkRecord record = new SinkRecord("test", 0, keySchema, key, null, null, 0);
+
+        try {
+            xform.apply(record);
+            fail("Expected exception wasn't raised");
+        } catch (IllegalArgumentException iae) {
+            assertEquals("Unknown field: nonexistent", iae.getMessage());
+        }
+    }
 }