You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/01/29 05:39:04 UTC

[camel-kafka-connector] branch master updated: create a toHeader SMT #902

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new a024c7c  create a toHeader SMT #902
a024c7c is described below

commit a024c7cff76c410b24da85e3bcf7d81154ed4fe0
Author: Luigi De Masi <ld...@redhat.com>
AuthorDate: Thu Jan 28 18:26:26 2021 +0100

    create a toHeader SMT #902
---
 .../transforms/FieldsToHeadersTransform.java       | 192 ++++++++++++
 .../transforms/FieldsToHeadersTransformTest.java   | 349 +++++++++++++++++++++
 2 files changed, 541 insertions(+)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransform.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransform.java
new file mode 100644
index 0000000..ed74886
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransform.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.transforms;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.kafka.connect.transforms.util.Requirements.requireMapOrNull;
+import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
+
+public abstract class FieldsToHeadersTransform<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FieldsToHeadersTransform.class);
+
+    private static final String PURPOSE = "fields extraction to headers";
+    private static final String FIELDS_CONFIG = "fields";
+    private static final String HEADERS_CONFIG = "headers";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELDS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Fields names to extract and set to headers")
+            .define(HEADERS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Headers names to set with extracted fields");
+
+
+    private List<String> fields;
+
+    private List<String> headers;
+
+
+
+    protected abstract Schema operatingSchema(R record);
+
+    protected abstract Object operatingValue(R record);
+
+
+    @Override
+    public R apply(R r) {
+        RecordValue value = createRecordValue(r);
+        Schema currentSchema;
+        Object currentValue;
+        for (int i = 0; i < fields.size(); i++) {
+            currentSchema = value.getFieldSchema(fields.get(i));
+            currentValue = value.getFieldValue(fields.get(i));
+            r.headers().add(headers.get(i), currentValue, currentSchema);
+        }
+        return r;
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public void configure(Map<String, ?> map) {
+        Map<String, Object> parsedConfig = CONFIG_DEF.parse(map);
+        fields =  (List<String>) parsedConfig.getOrDefault(FIELDS_CONFIG, new ArrayList<>());
+        headers = (List<String>) parsedConfig.getOrDefault(HEADERS_CONFIG, new ArrayList<>());
+        validateConfig();
+    }
+
+    private void validateConfig() {
+
+        boolean validFields  = fields.stream().allMatch(nef -> nef != null && !nef.trim().isEmpty());
+        boolean validHeaders = headers.stream().allMatch(nef -> nef != null && !nef.trim().isEmpty());
+
+        if (!(validFields && validHeaders)) {
+            throw new IllegalArgumentException("headers and fields configuration properties cannot be null or contain empty elements.");
+        }
+        if (fields.size() > headers.size()) {
+            String fieldsWithoutCorrespondingHeaders = fields.subList(headers.size(), fields.size()).stream().collect(Collectors.joining(","));
+            throw new IllegalArgumentException("There is no corresponding header(s) configured for the following field(s): " + fieldsWithoutCorrespondingHeaders);
+        }
+        if (headers.size() > fields.size()) {
+            String headersWithoutCorrespondingFields = headers.subList(fields.size(), headers.size()).stream().collect(Collectors.joining(","));
+            LOG.warn("There is no corresponding header(s) for the following field(s): {} ", headersWithoutCorrespondingFields);
+        }
+
+    }
+
+
+    private RecordValue createRecordValue(R r) {
+        final Schema schema = operatingSchema(r);
+        if (schema == null) {
+            return new MapRecordValue(requireMapOrNull(operatingValue(r), PURPOSE));
+        }
+        return new StructRecordValue(requireStructOrNull(operatingValue(r), PURPOSE), schema);
+    }
+
+    public static class Key<R extends ConnectRecord<R>> extends FieldsToHeadersTransform<R> {
+
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.keySchema();
+        }
+
+        @Override
+        protected Object operatingValue(R record) {
+            return record.key();
+        }
+    }
+
+    public static class Value<R extends ConnectRecord<R>> extends FieldsToHeadersTransform<R> {
+
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.valueSchema();
+        }
+
+        @Override
+        protected Object operatingValue(R record) {
+            return record.value();
+        }
+    }
+
+    public interface RecordValue {
+
+        Object getFieldValue(String fieldName);
+
+        Schema getFieldSchema(String fieldName);
+    }
+
+    public class MapRecordValue implements RecordValue {
+
+        private Map<String, Object> map;
+
+        public MapRecordValue(Map<String, Object> map) {
+            this.map = map;
+        }
+
+        public Object getFieldValue(String fieldName) {
+            return map == null ? null : map.get(fieldName);
+        }
+
+        public Schema getFieldSchema(String fieldName) {
+            return null;
+        }
+    }
+
+    public class StructRecordValue implements RecordValue {
+
+        private Struct struct;
+
+        private Schema schema;
+
+        public StructRecordValue(Struct struct, Schema schema) {
+            this.struct = struct;
+            this.schema = schema;
+        }
+
+        public Object getFieldValue(String fieldName) {
+            return struct.get(fieldName);
+        }
+
+        public Schema getFieldSchema(String fieldName) {
+            Field field = schema.field(fieldName);
+            if (field == null) {
+                throw new IllegalArgumentException("Unknown field: " + fieldName);
+            }
+            return field.schema();
+        }
+    }
+}
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransformTest.java
new file mode 100644
index 0000000..a954124
--- /dev/null
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransformTest.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.transforms;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+import org.apache.camel.util.function.TriConsumer;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.kafka.connect.data.Schema.BOOLEAN_SCHEMA;
+import static org.apache.kafka.connect.data.Schema.BYTES_SCHEMA;
+import static org.apache.kafka.connect.data.Schema.FLOAT32_SCHEMA;
+import static org.apache.kafka.connect.data.Schema.INT32_SCHEMA;
+import static org.apache.kafka.connect.data.Schema.OPTIONAL_STRING_SCHEMA;
+import static org.apache.kafka.connect.data.Schema.STRING_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class FieldsToHeadersTransformTest {
+
+    private static List<String> fields = Arrays.asList("FROM", "TO", "CC", "SUBJECT", "BODY", "INT_EXAMPLE", "BYTE_EXAMPLE", "BOOLEAN_EXAMPLE", "FLOAT_EXAMPLE");
+    private static List<String> headers = Arrays.asList("from", "to", "cc", "subject", "body", "int_example", "byte_example", "boolean_example", "float_example");
+
+    @Test
+    public void testKeyWithSchema() {
+        FieldsToHeadersTransform.Key fieldsToHeadersTransform = new FieldsToHeadersTransform.Key();
+        ConnectRecord transformedCr = testWithSchema(fieldsToHeadersTransform, (headerSchema, headerValue) -> new SourceRecord(null, null, "testTopic", headerSchema, headerValue, null, null));
+        assertNull(transformedCr.value());
+        assertNull(transformedCr.valueSchema());
+    }
+
+    @Test
+    public void testValueWithSchema() {
+        FieldsToHeadersTransform.Value fieldsToHeadersTransform = new FieldsToHeadersTransform.Value();
+        ConnectRecord transformedCr = testWithSchema(fieldsToHeadersTransform, (schema, value) -> new SourceRecord(null, null, "testTopic", schema, value));
+        assertNull(transformedCr.key());
+        assertNull(transformedCr.keySchema());
+    }
+
+    @Test
+    public void testWithoutSchema() {
+
+        TriConsumer<ConnectRecord, String, String> assertions = (record, headerName, expectedHeaderValue) -> {
+            assertNull(record.keySchema());
+            assertEquals("testTopic", record.topic());
+            Iterator<Header> headerIterator = record.headers().allWithName(headerName);
+            assertTrue(headerIterator.hasNext());
+            Header header = headerIterator.next();
+            assertEquals(expectedHeaderValue, header.value());
+            assertNull(header.schema());
+            assertFalse(headerIterator.hasNext());
+        };
+
+        Map<String, String> conf = new HashMap<>();
+        conf.put("fields", "FROM,TO");
+        conf.put("headers", "from,to");
+
+        Map<String, String> message = new HashMap<>();
+        message.put("FROM", "bob@example.com");
+        message.put("TO", "alice@mail.com");
+
+        // Test KEY
+        FieldsToHeadersTransform.Key fieldsToHeadersTransformKey = new FieldsToHeadersTransform.Key();
+        fieldsToHeadersTransformKey.configure(conf);
+
+        final SinkRecord recordKey = new SinkRecord("testTopic", 0, null, message, null, null, 0);
+        final ConnectRecord transformedRecordKey = fieldsToHeadersTransformKey.apply(recordKey);
+
+        assertions.accept(transformedRecordKey, "from", message.get("FROM"));
+        assertions.accept(transformedRecordKey, "to", message.get("TO"));
+
+        // Test VALUE
+        FieldsToHeadersTransform.Value fieldsToHeadersTransformValue = new FieldsToHeadersTransform.Value();
+        fieldsToHeadersTransformValue.configure(conf);
+        final SinkRecord recordValue = new SinkRecord("testTopic", 0, null, null, null, message, 0);
+        final ConnectRecord transformedRecordValue = fieldsToHeadersTransformValue.apply(recordValue);
+
+        assertions.accept(transformedRecordValue, "from", message.get("FROM"));
+        assertions.accept(transformedRecordValue, "to", message.get("TO"));
+    }
+
+    @Test
+    public void fieldsWithoutCorrespondingHeadersTest() {
+        Map<String, String> conf = new HashMap<>();
+        conf.put("fields", "FROM,TO,CC,SUBJECT,BODY");
+        conf.put("headers", "from,to");
+        // key
+        final FieldsToHeadersTransform.Key fieldsToHeadersTransformKey = new FieldsToHeadersTransform.Key();
+
+        Assertions.assertThrows(IllegalArgumentException.class, () -> {
+            fieldsToHeadersTransformKey.configure(conf);
+        });
+        // value
+        final FieldsToHeadersTransform fieldsToHeadersTransformValue = new FieldsToHeadersTransform.Value<>();
+        Assertions.assertThrows(IllegalArgumentException.class, () -> {
+            fieldsToHeadersTransformValue.configure(conf);
+        });
+    }
+
+    @Test
+    public void headersWithoutCorrespondingFieldssTest() {
+        Map<String, String> conf = new HashMap<>();
+        conf.put("fields", "FROM");
+        conf.put("headers", "from,to,cc,subject,body");
+        // key
+        final FieldsToHeadersTransform.Key fieldsToHeadersTransformKey = new FieldsToHeadersTransform.Key();
+        fieldsToHeadersTransformKey.configure(conf);
+
+        // value
+        final FieldsToHeadersTransform fieldsToHeadersTransformValue = new FieldsToHeadersTransform.Value<>();
+        fieldsToHeadersTransformValue.configure(conf);
+    }
+
+    @Test
+    public void missingFieldInTheSchemaKeyTest() {
+        FieldsToHeadersTransform.Key fieldsToHeadersTransform = new FieldsToHeadersTransform.Key();
+        Schema keySchema = buildSchemaWithoutCC();
+        Struct keyValue = buildValueWithoutCC(keySchema);
+
+        fieldsToHeadersTransform.configure(buildConfig());
+        ConnectRecord record = new SourceRecord(null, null, "testTopic", keySchema, keyValue, null, null);
+
+        Assertions.assertThrows(IllegalArgumentException.class, () -> {
+            fieldsToHeadersTransform.apply(record);
+        });
+    }
+
+    @Test
+    public void missingFieldInTheSchemaValueTest() {
+        FieldsToHeadersTransform.Value fieldsToHeadersTransform = new FieldsToHeadersTransform.Value();
+        Schema valueSchema = buildSchemaWithoutCC();
+        Struct value = buildValueWithoutCC(valueSchema);
+
+        fieldsToHeadersTransform.configure(buildConfig());
+        ConnectRecord record = new SourceRecord(null, null, "testTopic", valueSchema, value);
+
+        Assertions.assertThrows(IllegalArgumentException.class, () -> {
+            fieldsToHeadersTransform.apply(record);
+        });
+    }
+
+    @Test
+    public void missingFieldInTheRecordKeyWithSchemaTest() {
+        FieldsToHeadersTransform.Key fieldsToHeadersTransform = new FieldsToHeadersTransform.Key();
+        Schema keySchema = buildSchema();
+        Struct keyValue = buildValueWithoutCC(keySchema);
+
+        fieldsToHeadersTransform.configure(buildConfig());
+        ConnectRecord record = new SourceRecord(null, null, "testTopic", keySchema, keyValue, null, null);
+
+        ConnectRecord transformedRecord = fieldsToHeadersTransform.apply(record);
+        Iterator<Header> headerIterator = transformedRecord.headers().allWithName("cc");
+        assertTrue(headerIterator.hasNext());
+        Header header = headerIterator.next();
+        assertNotNull(header);
+        assertNull(header.value());
+        assertEquals(OPTIONAL_STRING_SCHEMA, header.schema());
+        assertFalse(headerIterator.hasNext());
+    }
+
+    @Test
+    public void missingFieldInTheRecordValueWithSchemaTest() {
+        FieldsToHeadersTransform.Value fieldsToHeadersTransform = new FieldsToHeadersTransform.Value();
+        Schema valueSchema = buildSchema();
+        Struct value = buildValueWithoutCC(valueSchema);
+
+        fieldsToHeadersTransform.configure(buildConfig());
+        ConnectRecord record = new SourceRecord(null, null, "testTopic", valueSchema, value);
+
+        ConnectRecord transformedRecord = fieldsToHeadersTransform.apply(record);
+        Iterator<Header> headerIterator = transformedRecord.headers().allWithName("cc");
+        assertTrue(headerIterator.hasNext());
+        Header header = headerIterator.next();
+        assertNotNull(header);
+        assertNull(header.value());
+        assertEquals(OPTIONAL_STRING_SCHEMA, header.schema());
+        assertFalse(headerIterator.hasNext());
+
+    }
+    @Test
+    public void missingFieldWithoutSchema() {
+
+        TriConsumer<ConnectRecord, String, String> assertions = (record, headerName, expectedHeaderValue) -> {
+            assertNull(record.keySchema());
+            assertEquals("testTopic", record.topic());
+            Iterator<Header> headerIterator = record.headers().allWithName(headerName);
+            assertTrue(headerIterator.hasNext());
+            Header header = headerIterator.next();
+            assertEquals(expectedHeaderValue, header.value());
+            assertNull(header.schema());
+            assertFalse(headerIterator.hasNext());
+        };
+
+        Map<String, String> conf = new HashMap<>();
+        conf.put("fields", "FROM,TO,CC");
+        conf.put("headers", "from,to,cc");
+
+        Map<String, String> message = new HashMap<>();
+        message.put("FROM", "bob@example.com");
+        message.put("TO", "alice@mail.com");
+
+        // Test KEY
+        FieldsToHeadersTransform.Key fieldsToHeadersTransformKey = new FieldsToHeadersTransform.Key();
+        fieldsToHeadersTransformKey.configure(conf);
+
+        final SinkRecord recordKey = new SinkRecord("testTopic", 0, null, message, null, null, 0);
+        final ConnectRecord transformedRecordKey = fieldsToHeadersTransformKey.apply(recordKey);
+
+        assertions.accept(transformedRecordKey, "from", message.get("FROM"));
+        assertions.accept(transformedRecordKey, "to", message.get("TO"));
+        Iterator<Header> headerIterator = transformedRecordKey.headers().allWithName("cc");
+        assertTrue(headerIterator.hasNext());
+        Header header = headerIterator.next();
+        assertNotNull(header);
+        assertNull(header.value());
+        assertFalse(headerIterator.hasNext());
+
+        // Test VALUE
+        FieldsToHeadersTransform.Value fieldsToHeadersTransformValue = new FieldsToHeadersTransform.Value();
+        fieldsToHeadersTransformValue.configure(conf);
+        final SinkRecord recordValue = new SinkRecord("testTopic", 0, null, null, null, message, 0);
+        final ConnectRecord transformedRecordValue = fieldsToHeadersTransformValue.apply(recordValue);
+
+        assertions.accept(transformedRecordValue, "from", message.get("FROM"));
+        assertions.accept(transformedRecordValue, "to", message.get("TO"));
+
+        headerIterator = transformedRecordKey.headers().allWithName("cc");
+        assertTrue(headerIterator.hasNext());
+        header = headerIterator.next();
+        assertNotNull(header);
+        assertNull(header.value());
+        assertFalse(headerIterator.hasNext());
+    }
+
+    private static Schema buildSchemaWithoutCC() {
+        return SchemaBuilder.struct()
+                .field("FROM", STRING_SCHEMA)
+                .field("TO", STRING_SCHEMA)
+                .field("SUBJECT", STRING_SCHEMA)
+                .field("BODY", STRING_SCHEMA)
+                .field("INT_EXAMPLE", INT32_SCHEMA)
+                .field("BYTE_EXAMPLE", BYTES_SCHEMA)
+                .field("BOOLEAN_EXAMPLE", BOOLEAN_SCHEMA)
+                .field("FLOAT_EXAMPLE", FLOAT32_SCHEMA).build();
+    }
+
+    private static Struct buildValueWithoutCC(Schema schema) {
+        byte[] attachment = new byte[32];
+        new Random().nextBytes(attachment);
+        return new Struct(schema)
+                .put("FROM", "bob@example.com")
+                .put("TO", "alice@mail.com")
+                .put("SUBJECT", "Needs Attention")
+                .put("BODY", "there is an issue that needs your attention")
+                .put("INT_EXAMPLE", 34)
+                .put("BYTE_EXAMPLE", attachment)
+                .put("BOOLEAN_EXAMPLE", true)
+                .put("FLOAT_EXAMPLE", 34.5F);
+    }
+
+    private static Schema buildSchema() {
+        return SchemaBuilder.struct()
+                .field("FROM", STRING_SCHEMA)
+                .field("TO", STRING_SCHEMA)
+                .field("CC", OPTIONAL_STRING_SCHEMA)
+                .field("SUBJECT", STRING_SCHEMA)
+                .field("BODY", STRING_SCHEMA)
+                .field("INT_EXAMPLE", INT32_SCHEMA)
+                .field("BYTE_EXAMPLE", BYTES_SCHEMA)
+                .field("BOOLEAN_EXAMPLE", BOOLEAN_SCHEMA)
+                .field("FLOAT_EXAMPLE", FLOAT32_SCHEMA).build();
+    }
+
+    private static Struct buildValue(Schema schema) {
+        byte[] attachment = new byte[32];
+        new Random().nextBytes(attachment);
+        return new Struct(schema)
+                .put("FROM", "bob@example.com")
+                .put("TO", "alice@mail.com")
+                .put("CC", "managers@enterprise.com")
+                .put("SUBJECT", "Needs Attention")
+                .put("BODY", "there is an issue that needs your attention")
+                .put("INT_EXAMPLE", 34)
+                .put("BYTE_EXAMPLE", attachment)
+                .put("BOOLEAN_EXAMPLE", true)
+                .put("FLOAT_EXAMPLE", 34.5F);
+    }
+
+    private static Map<String, String> buildConfig() {
+        Map map = new HashMap();
+        map.put("fields", fields.stream().collect(Collectors.joining(",")));
+        map.put("headers", headers.stream().collect(Collectors.joining(",")));
+        return map;
+    }
+
+    public ConnectRecord testWithSchema(FieldsToHeadersTransform fieldsToHeadersTransform, BiFunction<Schema, Struct, ConnectRecord> createRecord) {
+        Schema valueSchema = buildSchema();
+
+        Struct value = buildValue(valueSchema);
+        fieldsToHeadersTransform.configure(buildConfig());
+        ConnectRecord record = createRecord.apply(valueSchema, value);
+        ConnectRecord transformedCr = fieldsToHeadersTransform.apply(record);
+
+        assertEquals("testTopic", transformedCr.topic());
+        Iterator<Header> headerIterator;
+        Header header;
+        for (int i = 0; i < fields.size(); i++) {
+            headerIterator = transformedCr.headers().allWithName(headers.get(i));
+            assertTrue(headerIterator.hasNext());
+            header = headerIterator.next();
+            assertEquals(value.get(fields.get(i)), header.value());
+            assertEquals(valueSchema.field(fields.get(i)).schema(), header.schema());
+            assertFalse(headerIterator.hasNext());
+        }
+        return transformedCr;
+    }
+}