You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/02/11 20:15:05 UTC
[kafka] branch 2.3 updated: KAFKA-7052 Avoiding NPE in ExtractField
SMT in case of non-existent fields (#8059)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new 41e32fa KAFKA-7052 Avoiding NPE in ExtractField SMT in case of non-existent fields (#8059)
41e32fa is described below
commit 41e32fa9c187d1e8fd0072fd01f977210904b038
Author: Gunnar Morling <gu...@googlemail.com>
AuthorDate: Tue Feb 11 20:48:22 2020 +0100
KAFKA-7052 Avoiding NPE in ExtractField SMT in case of non-existent fields (#8059)
Author: Gunnar Morling <gu...@googlemail.com>
Reviewer: Randall Hauch <rh...@gmail.com>
---
.../kafka/connect/transforms/ExtractField.java | 9 +++++++-
.../kafka/connect/transforms/ExtractFieldTest.java | 27 ++++++++++++++++++++++
2 files changed, 35 insertions(+), 1 deletion(-)
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
index eb8c357..bd3cbd9 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
@@ -58,7 +59,13 @@ public abstract class ExtractField<R extends ConnectRecord<R>> implements Transf
return newRecord(record, null, value == null ? null : value.get(fieldName));
} else {
final Struct value = requireStructOrNull(operatingValue(record), PURPOSE);
- return newRecord(record, schema.field(fieldName).schema(), value == null ? null : value.get(fieldName));
+ Field field = schema.field(fieldName);
+
+ if (field == null) {
+ throw new IllegalArgumentException("Unknown field: " + fieldName);
+ }
+
+ return newRecord(record, field.schema(), value == null ? null : value.get(fieldName));
}
}
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
index acb0beb..a78c6d5 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
@@ -28,6 +28,7 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
public class ExtractFieldTest {
private final ExtractField<SinkRecord> xform = new ExtractField.Key<>();
@@ -86,4 +87,30 @@ public class ExtractFieldTest {
assertNull(transformedRecord.key());
}
+ @Test
+ public void nonExistentFieldSchemalessShouldReturnNull() {
+ xform.configure(Collections.singletonMap("field", "nonexistent"));
+
+ final SinkRecord record = new SinkRecord("test", 0, null, Collections.singletonMap("magic", 42), null, null, 0);
+ final SinkRecord transformedRecord = xform.apply(record);
+
+ assertNull(transformedRecord.keySchema());
+ assertNull(transformedRecord.key());
+ }
+
+ @Test
+ public void nonExistentFieldWithSchemaShouldFail() {
+ xform.configure(Collections.singletonMap("field", "nonexistent"));
+
+ final Schema keySchema = SchemaBuilder.struct().field("magic", Schema.INT32_SCHEMA).build();
+ final Struct key = new Struct(keySchema).put("magic", 42);
+ final SinkRecord record = new SinkRecord("test", 0, keySchema, key, null, null, 0);
+
+ try {
+ xform.apply(record);
+ fail("Expected exception wasn't raised");
+ } catch (IllegalArgumentException iae) {
+ assertEquals("Unknown field: nonexistent", iae.getMessage());
+ }
+ }
}