You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/03 16:10:11 UTC

[GitHub] [kafka] tombentley opened a new pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

tombentley opened a new pull request #9549:
URL: https://github.com/apache/kafka/pull/9549


   These SMTs were originally specified in KIP-145 but never implemented at the time.
   
   HeaderTo is not included since its original specification doesn't deal with the fact that there can be >1 header with the same name, but a field can only have a single value (while you could use an array, that doesn't work if the headers for the given name had different schemas).
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#discussion_r523161370



##########
File path: connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.connect.data.Schema.STRING_SCHEMA;
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class HeaderFromTest {
+
+    private final boolean keyTransform;
+
+    static class RecordBuilder {
+        private final List<String> fields = new ArrayList<>(2);
+        private final List<Schema> fieldSchemas = new ArrayList<>(2);
+        private final List<Object> fieldValues = new ArrayList<>(2);
+        private final ConnectHeaders headers = new ConnectHeaders();
+
+        public RecordBuilder() {
+        }
+
+        public RecordBuilder withField(String name, Schema schema, Object value) {
+            fields.add(name);
+            fieldSchemas.add(schema);
+            fieldValues.add(value);
+            return this;
+        }
+
+        public RecordBuilder addHeader(String name, Schema schema, Object value) {
+            headers.add(name, new SchemaAndValue(schema, value));
+            return this;
+        }
+
+        public SourceRecord schemaless(boolean keyTransform) {
+            Map<String, Object> map = new HashMap<>();
+            for (int i = 0; i < this.fields.size(); i++) {
+                String fieldName = this.fields.get(i);
+                map.put(fieldName, this.fieldValues.get(i));
+
+            }
+            return sourceRecord(keyTransform, null, map);
+        }
+
+        private Schema schema() {
+            SchemaBuilder schemaBuilder = new SchemaBuilder(Schema.Type.STRUCT);
+            for (int i = 0; i < this.fields.size(); i++) {
+                String fieldName = this.fields.get(i);
+                schemaBuilder.field(fieldName, this.fieldSchemas.get(i));
+
+            }
+            return schemaBuilder.build();
+        }
+
+        private Struct struct(Schema schema) {
+            Struct struct = new Struct(schema);
+            for (int i = 0; i < this.fields.size(); i++) {
+                String fieldName = this.fields.get(i);
+                struct.put(fieldName, this.fieldValues.get(i));
+            }
+            return struct;
+        }
+
+        public SourceRecord withSchema(boolean keyTransform) {
+            Schema schema = schema();
+            Struct struct = struct(schema);
+            return sourceRecord(keyTransform, schema, struct);
+        }
+
+        private SourceRecord sourceRecord(boolean keyTransform, Schema keyOrValueSchema, Object keyOrValue) {
+            Map<String, ?> sourcePartition = singletonMap("foo", "bar");
+            Map<String, ?> sourceOffset = singletonMap("baz", "quxx");
+            String topic = "topic";
+            Integer partition = 0;
+            Long timestamp = 0L;
+
+            ConnectHeaders headers = this.headers;
+            if (keyOrValueSchema == null) {
+                // When doing a schemaless transformation we don't expect the header to have a schema
+                headers = new ConnectHeaders();
+                for (Header header : this.headers) {
+                    headers.add(header.key(), new SchemaAndValue(null, header.value()));
+                }
+            }
+            return new SourceRecord(sourcePartition, sourceOffset, topic, partition,
+                    keyTransform ? keyOrValueSchema : null,
+                    keyTransform ? keyOrValue : "key",
+                    !keyTransform ? keyOrValueSchema : null,
+                    !keyTransform ? keyOrValue : "value",
+                    timestamp, headers);
+        }
+
+        @Override
+        public String toString() {
+            return "RecordBuilder(" +
+                    "fields=" + fields +
+                    ", fieldSchemas=" + fieldSchemas +
+                    ", fieldValues=" + fieldValues +
+                    ", headers=" + headers +
+                    ')';
+        }
+    }
+
+    @Parameterized.Parameters(name = "{0}: testKey={1}, xformFields={3}, xformHeaders={4}, operation={5}")
+    public static Collection<Object[]> data() {
+
+        List<Object[]> result = new ArrayList<>();
+
+
+
+        for (Boolean testKeyTransform : asList(true, false)) {
+            result.add(
+                new Object[]{
+                    "basic copy",
+                    testKeyTransform,
+                    new RecordBuilder()
+                        .withField("field1", STRING_SCHEMA, "field1-value")
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("header1", STRING_SCHEMA, "existing-value"),
+                    singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.COPY,
+                    new RecordBuilder()
+                        .withField("field1", STRING_SCHEMA, "field1-value")
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("header1", STRING_SCHEMA, "existing-value")
+                        .addHeader("inserted1", STRING_SCHEMA, "field1-value")
+                });
+            result.add(
+                new Object[]{
+                    "basic move",
+                    testKeyTransform,
+                    new RecordBuilder()
+                        .withField("field1", STRING_SCHEMA, "field1-value")
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("header1", STRING_SCHEMA, "existing-value"),
+                    singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.MOVE,
+                    new RecordBuilder()
+                        // field1 got moved
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("header1", STRING_SCHEMA, "existing-value")
+                        .addHeader("inserted1", STRING_SCHEMA, "field1-value")
+                });
+            result.add(
+                new Object[]{
+                    "copy with preexisting header",
+                    testKeyTransform,
+                    new RecordBuilder()
+                        .withField("field1", STRING_SCHEMA, "field1-value")
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("inserted1", STRING_SCHEMA, "existing-value"),
+                    singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.COPY,
+                    new RecordBuilder()
+                        .withField("field1", STRING_SCHEMA, "field1-value")
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("inserted1", STRING_SCHEMA, "existing-value")
+                        .addHeader("inserted1", STRING_SCHEMA, "field1-value")
+                });
+            result.add(
+                new Object[]{
+                    "move with preexisting header",
+                    testKeyTransform,
+                    new RecordBuilder()
+                        .withField("field1", STRING_SCHEMA, "field1-value")
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("inserted1", STRING_SCHEMA, "existing-value"),
+                    singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.MOVE,
+                    new RecordBuilder()
+                        // field1 got moved
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("inserted1", STRING_SCHEMA, "existing-value")
+                        .addHeader("inserted1", STRING_SCHEMA, "field1-value")
+                });
+            Schema schema = new SchemaBuilder(Schema.Type.STRUCT).field("foo", STRING_SCHEMA).build();
+            Struct struct = new Struct(schema).put("foo", "foo-value");
+            result.add(
+                new Object[]{
+                    "copy with struct value",
+                    testKeyTransform,
+                    new RecordBuilder()
+                        .withField("field1", schema, struct)
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("header1", STRING_SCHEMA, "existing-value"),
+                    singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.COPY,
+                    new RecordBuilder()
+                        .withField("field1", schema, struct)
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("header1", STRING_SCHEMA, "existing-value")
+                        .addHeader("inserted1", schema, struct)
+                });
+            result.add(
+                new Object[]{
+                    "move with struct value",
+                    testKeyTransform,
+                    new RecordBuilder()
+                        .withField("field1", schema, struct)
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("header1", STRING_SCHEMA, "existing-value"),
+                    singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.MOVE,
+                    new RecordBuilder()
+                        // field1 got moved
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("header1", STRING_SCHEMA, "existing-value")
+                        .addHeader("inserted1", schema, struct)
+                });
+            result.add(
+                new Object[]{
+                    "two headers from same field",
+                    testKeyTransform,
+                    new RecordBuilder()
+                        .withField("field1", STRING_SCHEMA, "field1-value")
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("header1", STRING_SCHEMA, "existing-value"),
+                    // two headers from the same field
+                    asList("field1", "field1"), asList("inserted1", "inserted2"), HeaderFrom.Operation.MOVE,
+                    new RecordBuilder()
+                        // field1 got moved
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("header1", STRING_SCHEMA, "existing-value")
+                        .addHeader("inserted1", STRING_SCHEMA, "field1-value")
+                        .addHeader("inserted2", STRING_SCHEMA, "field1-value")
+                });
+            result.add(
+                new Object[]{
+                    "two fields to same header",
+                    testKeyTransform,
+                    new RecordBuilder()
+                        .withField("field1", STRING_SCHEMA, "field1-value")
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("header1", STRING_SCHEMA, "existing-value"),
+                    // two headers from the same field
+                    asList("field1", "field2"), asList("inserted1", "inserted1"), HeaderFrom.Operation.MOVE,
+                    new RecordBuilder()
+                        // field1 and field2 got moved
+                        .addHeader("header1", STRING_SCHEMA, "existing-value")
+                        .addHeader("inserted1", STRING_SCHEMA, "field1-value")
+                        .addHeader("inserted1", STRING_SCHEMA, "field2-value")
+                });
+        }
+        return result;
+    }
+
+    private final HeaderFrom<SourceRecord> xform;
+
+    private final RecordBuilder originalRecordBuilder;
+    private final RecordBuilder expectedRecordBuilder;
+    private final List<String> transformFields;
+    private final List<String> headers;
+    private final HeaderFrom.Operation operation;
+
+    public HeaderFromTest(String description,
+                          boolean keyTransform,
+                          RecordBuilder originalBuilder,
+                          List<String> transformFields, List<String> headers, HeaderFrom.Operation operation,
+                          RecordBuilder expectedBuilder) {
+        this.keyTransform = keyTransform;
+        this.xform = keyTransform ? new HeaderFrom.Key<>() : new HeaderFrom.Value<>();
+        this.originalRecordBuilder = originalBuilder;
+        this.expectedRecordBuilder = expectedBuilder;
+        this.transformFields = transformFields;
+        this.headers = headers;
+        this.operation = operation;
+    }
+
+    private Map<String, Object> config() {
+        Map<String, Object> result = new HashMap<>();
+        result.put(HeaderFrom.HEADERS_FIELD, headers);
+        result.put(HeaderFrom.FIELDS_FIELD, transformFields);
+        result.put(HeaderFrom.OPERATION_FIELD, operation.toString());
+        return result;
+    }
+
+    @Test
+    public void schemaless() {
+        xform.configure(config());
+        ConnectHeaders headers = new ConnectHeaders();
+        headers.addString("existing", "existing-value");
+
+        SourceRecord originalRecord = originalRecordBuilder.schemaless(keyTransform);
+        SourceRecord expectedRecord = expectedRecordBuilder.schemaless(keyTransform);
+        SourceRecord xformed = xform.apply(originalRecord);
+        assertSameRecord(expectedRecord, xformed);
+    }
+
+    @Test
+    public void withSchema() {
+        xform.configure(config());
+        ConnectHeaders headers = new ConnectHeaders();
+        headers.addString("existing", "existing-value");
+        Headers expect = headers.duplicate();
+        for (int i = 0; i < this.headers.size(); i++) {
+            expect.add(this.headers.get(i), originalRecordBuilder.fieldValues.get(i), originalRecordBuilder.fieldSchemas.get(i));
+        }
+
+        SourceRecord originalRecord = originalRecordBuilder.withSchema(keyTransform);
+        SourceRecord expectedRecord = expectedRecordBuilder.withSchema(keyTransform);
+        SourceRecord xformed = xform.apply(originalRecord);
+        assertSameRecord(expectedRecord, xformed);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void invalidConfig() {
+        Map<String, Object> config = config();
+        List<String> headers = new ArrayList<>(this.headers);
+        headers.add("unexpected");
+        config.put(HeaderFrom.HEADERS_FIELD, headers);
+        xform.configure(config);
+    }

Review comment:
       Little silly to run this for every iteration of the parameterized test 😛 
   I'm guessing there isn't an easy way to run this only once?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#issuecomment-721226685


   @kkonstantine would you be able to review this? These SMTs were originally specified in [KIP145](https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect#KIP145ExposeRecordHeadersinKafkaConnect-HeaderFrom) but never implemented. It seems they were forgotten about.
   
   @C0urante maybe you'd also like to review?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#discussion_r613931955



##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.NonEmptyListValidator;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.lang.String.format;
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String FIELDS_FIELD = "fields";
+    public static final String HEADERS_FIELD = "headers";
+    public static final String OPERATION_FIELD = "operation";
+    private static final String MOVE_OPERATION = "move";
+    private static final String COPY_OPERATION = "copy";
+
+    public static final String OVERVIEW_DOC =
+            "Moves or copies fields in the key/value of a record into that record's headers. " +
+                    "Corresponding elements of <code>" + FIELDS_FIELD + "</code> and " +
+                    "<code>" + HEADERS_FIELD + "</code> together identify a field and the header it should be " +
+                    "moved or copied to. " +
+                    "Use the concrete transformation type designed for the record " +
+                    "key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELDS_FIELD, ConfigDef.Type.LIST,
+                    NO_DEFAULT_VALUE, new NonEmptyListValidator(),
+                    ConfigDef.Importance.HIGH,
+                    "Field names in the record whose values are to be copied or moved to headers.")
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST,
+                    NO_DEFAULT_VALUE, new NonEmptyListValidator(),
+                    ConfigDef.Importance.HIGH,
+                    "Header names, in the same order as the field names listed in the fields configuration property.")
+            .define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+                    ConfigDef.ValidString.in(MOVE_OPERATION, COPY_OPERATION), ConfigDef.Importance.HIGH,
+                    "Either <code>move</code> if the fields are to be moved to the headers (removed from the key/value), " +
+                            "or <code>copy</code> if the fields are to be copied to the headers (retained in the key/value).");
+
+    enum Operation {
+        MOVE(MOVE_OPERATION),
+        COPY(COPY_OPERATION);
+
+        private final String name;
+
+        Operation(String name) {
+            this.name = name;
+        }
+
+        static Operation fromName(String name) {
+            switch (name) {
+                case MOVE_OPERATION:
+                    return MOVE;
+                case COPY_OPERATION:
+                    return COPY;
+                default:
+                    throw new IllegalArgumentException();

Review comment:
       Ok, that's fair enough. Thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#issuecomment-734421216


   @kkonstantine, @rhauch did you get a chance to look at this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#issuecomment-727852419


   @kkonstantine or perhaps @rhauch  please could one of you take a look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#discussion_r613926107



##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.NonEmptyListValidator;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.lang.String.format;
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String FIELDS_FIELD = "fields";
+    public static final String HEADERS_FIELD = "headers";
+    public static final String OPERATION_FIELD = "operation";
+    private static final String MOVE_OPERATION = "move";
+    private static final String COPY_OPERATION = "copy";
+
+    public static final String OVERVIEW_DOC =
+            "Moves or copies fields in the key/value of a record into that record's headers. " +
+                    "Corresponding elements of <code>" + FIELDS_FIELD + "</code> and " +
+                    "<code>" + HEADERS_FIELD + "</code> together identify a field and the header it should be " +
+                    "moved or copied to. " +
+                    "Use the concrete transformation type designed for the record " +
+                    "key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELDS_FIELD, ConfigDef.Type.LIST,
+                    NO_DEFAULT_VALUE, new NonEmptyListValidator(),
+                    ConfigDef.Importance.HIGH,
+                    "Field names in the record whose values are to be copied or moved to headers.")
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST,
+                    NO_DEFAULT_VALUE, new NonEmptyListValidator(),
+                    ConfigDef.Importance.HIGH,
+                    "Header names, in the same order as the field names listed in the fields configuration property.")
+            .define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+                    ConfigDef.ValidString.in(MOVE_OPERATION, COPY_OPERATION), ConfigDef.Importance.HIGH,
+                    "Either <code>move</code> if the fields are to be moved to the headers (removed from the key/value), " +
+                            "or <code>copy</code> if the fields are to be copied to the headers (retained in the key/value).");
+
+    enum Operation {
+        MOVE(MOVE_OPERATION),
+        COPY(COPY_OPERATION);
+
+        private final String name;
+
+        Operation(String name) {
+            this.name = name;
+        }
+
+        static Operation fromName(String name) {
+            switch (name) {
+                case MOVE_OPERATION:
+                    return MOVE;
+                case COPY_OPERATION:
+                    return COPY;
+                default:
+                    throw new IllegalArgumentException();

Review comment:
       It is impossible due to the `ConfigDef.ValidString.in(MOVE_OPERATION, COPY_OPERATION)`, so this is really an assertion failure. The line number in the stacktrace would be enough to track it down if it ever did happen due to a later refactoring, so imho an error message is of no value. But I'm happy to add one if you like.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#issuecomment-821204260


   Failing tests are those reported in [KAFKA-12629](https://issues.apache.org/jira/browse/KAFKA-12629), [KAFKA-12284](https://issues.apache.org/jira/browse/KAFKA-12284) and [KAFKA-9295](https://issues.apache.org/jira/browse/KAFKA-9295):
   
   ```
   Build / JDK 11 and Scala 2.13 / testCreateClusterAndCreateAndManyTopicsWithManyPartitions() – kafka.server.RaftClusterTest32s
   Build / JDK 8 and Scala 2.12 / testReplication() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest1m 32s
   Build / JDK 8 and Scala 2.12 / testReplicationWithEmptyPartition() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest1m 26s
   Build / JDK 8 and Scala 2.12 / testOneWayReplicationWithAutoOffsetSync() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest1m 23s
   Build / JDK 8 and Scala 2.12 / testReplication() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest1m 27s
   Build / JDK 8 and Scala 2.12 / testReplicationWithEmptyPartition() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest1m 16s
   Build / JDK 8 and Scala 2.12 / shouldInnerJoinMultiPartitionQueryable – org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest
   ```
   
   All these pass using the relevant JDKs on my machine. Merging to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley merged pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

Posted by GitBox <gi...@apache.org>.
tombentley merged pull request #9549:
URL: https://github.com/apache/kafka/pull/9549


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#issuecomment-724213528


   Thanks for reaching out @tombentley! Happy to take a look.
   
   For reference, maybe you could add a link to https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect#KIP145ExposeRecordHeadersinKafkaConnect-Transformations to the description?
   
   Will begin reviewing shortly.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#discussion_r521215081



##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String FIELDS_FIELD = "fields";
+    public static final String HEADERS_FIELD = "headers";
+    public static final String OPERATION_FIELD = "operation";
+
+    public static final String OVERVIEW_DOC =
+            "Moves or copies fields in the key/value of a record into that record's headers. " +
+                    "Corresponding elements of <code>" + FIELDS_FIELD + "</code> and " +
+                    "<code>" + HEADERS_FIELD + "</code> together identify a field and the header it should be " +
+                    "moved or copied to. " +
+                    "Use the concrete transformation type designed for the record " +
+                    "key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELDS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Field names in the record whose values are to be copied or moved to headers.")
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Header names, in the same order as the field names listed in the fields configuration property.")
+            .define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+                    ConfigDef.ValidString.in("move", "copy"), ConfigDef.Importance.HIGH,
+                    "Either <code>move</code> if the fields are to be moved to the headers (removed from the key/value), " +
+                            "or <code>copy</code> if the fields are to be copied to the headers (retained in the key/value).");

Review comment:
       `MOVE.name` is not a compile time constant (at least the compiler doesn't see it that way). I factored out constants to avoid the duplication of literals.

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String FIELDS_FIELD = "fields";
+    public static final String HEADERS_FIELD = "headers";
+    public static final String OPERATION_FIELD = "operation";
+
+    public static final String OVERVIEW_DOC =
+            "Moves or copies fields in the key/value of a record into that record's headers. " +
+                    "Corresponding elements of <code>" + FIELDS_FIELD + "</code> and " +
+                    "<code>" + HEADERS_FIELD + "</code> together identify a field and the header it should be " +
+                    "moved or copied to. " +
+                    "Use the concrete transformation type designed for the record " +
+                    "key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELDS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Field names in the record whose values are to be copied or moved to headers.")
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Header names, in the same order as the field names listed in the fields configuration property.")
+            .define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+                    ConfigDef.ValidString.in("move", "copy"), ConfigDef.Importance.HIGH,
+                    "Either <code>move</code> if the fields are to be moved to the headers (removed from the key/value), " +
+                            "or <code>copy</code> if the fields are to be copied to the headers (retained in the key/value).");
+
+    enum Operation {
+        MOVE("move"),
+        COPY("copy");
+
+        private final String name;
+
+        Operation(String name) {
+            this.name = name;
+        }
+
+        static Operation fromName(String name) {
+            switch (name) {
+                case "move":
+                    return MOVE;
+                case "copy":
+                    return COPY;
+                default:
+                    throw new IllegalArgumentException();
+            }
+        }
+
+        public String toString() {
+            return name;
+        }
+    }
+
+    private List<String> fields;
+
+    private List<String> headers;
+
+    private Operation operation;
+
+    @Override
+    public R apply(R record) {
+        Object operatingValue = operatingValue(record);
+        Schema operatingSchema = operatingSchema(record);
+
+        if (operatingSchema == null) {
+            return applySchemaless(record, operatingValue);
+        } else {
+            return applyWithSchema(record, operatingValue, operatingSchema);
+        }
+    }
+
+    private R applyWithSchema(R record, Object operatingValue, Schema operatingSchema) {
+        Headers updatedHeaders = record.headers().duplicate();

Review comment:
       I wasn't completely sure whether a transformation was allowed to mutate headers, since it has to create a new `ConnectRecord`. Take this code from `WorkerSourceTask` as an example:
   
   ```java
               final SourceRecord record = transformationChain.apply(preTransformRecord);
               final ProducerRecord<byte[], byte[]> producerRecord = convertTransformedRecord(record);
               if (producerRecord == null || retryWithToleranceOperator.failed()) {
                   counter.skipRecord();
                   commitTaskRecord(preTransformRecord, null);
                   continue;
               }
   ```
   
   See how `preTransformRecord` can be used after the transformation chain has been applied? I think it would be incorrect for `commitTaskRecord` to commit the original record but with headers which had been mutated by transformations, right?

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String FIELDS_FIELD = "fields";
+    public static final String HEADERS_FIELD = "headers";
+    public static final String OPERATION_FIELD = "operation";
+
+    public static final String OVERVIEW_DOC =
+            "Moves or copies fields in the key/value of a record into that record's headers. " +
+                    "Corresponding elements of <code>" + FIELDS_FIELD + "</code> and " +
+                    "<code>" + HEADERS_FIELD + "</code> together identify a field and the header it should be " +
+                    "moved or copied to. " +
+                    "Use the concrete transformation type designed for the record " +
+                    "key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELDS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Field names in the record whose values are to be copied or moved to headers.")
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Header names, in the same order as the field names listed in the fields configuration property.")
+            .define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+                    ConfigDef.ValidString.in("move", "copy"), ConfigDef.Importance.HIGH,
+                    "Either <code>move</code> if the fields are to be moved to the headers (removed from the key/value), " +
+                            "or <code>copy</code> if the fields are to be copied to the headers (retained in the key/value).");
+
+    enum Operation {
+        MOVE("move"),
+        COPY("copy");
+
+        private final String name;
+
+        Operation(String name) {
+            this.name = name;
+        }
+
+        static Operation fromName(String name) {
+            switch (name) {
+                case "move":
+                    return MOVE;
+                case "copy":
+                    return COPY;
+                default:
+                    throw new IllegalArgumentException();
+            }
+        }
+
+        public String toString() {
+            return name;
+        }
+    }
+
+    private List<String> fields;
+
+    private List<String> headers;
+
+    private Operation operation;
+
+    @Override
+    public R apply(R record) {
+        Object operatingValue = operatingValue(record);
+        Schema operatingSchema = operatingSchema(record);
+
+        if (operatingSchema == null) {
+            return applySchemaless(record, operatingValue);
+        } else {
+            return applyWithSchema(record, operatingValue, operatingSchema);
+        }
+    }
+
+    private R applyWithSchema(R record, Object operatingValue, Schema operatingSchema) {
+        Headers updatedHeaders = record.headers().duplicate();
+        Struct value = Requirements.requireStruct(operatingValue, "header " + operation);
+        final Schema updatedSchema;
+        if (operation == Operation.MOVE) {
+            updatedSchema = moveSchema(operatingSchema);
+        } else {
+            updatedSchema = operatingSchema;
+        }
+        final Struct updatedValue = new Struct(updatedSchema);
+        for (Field field : updatedSchema.fields()) {
+            updatedValue.put(field, value.get(field.name()));
+        }
+        for (int i = 0; i < fields.size(); i++) {
+            String fieldName = fields.get(i);
+            String headerName = headers.get(i);
+            Object fieldValue = value.get(fieldName);
+            Schema fieldSchema = operatingSchema.field(fieldName).schema();
+            updatedHeaders.add(headerName, fieldValue, fieldSchema);
+        }
+        return newRecord(record, updatedSchema, updatedValue, updatedHeaders);
+    }
+
+    private Schema moveSchema(Schema operatingSchema) {
+        final Schema updatedSchema;
+        final SchemaBuilder builder = SchemaUtil.copySchemaBasics(operatingSchema, SchemaBuilder.struct());
+        for (Field field : operatingSchema.fields()) {
+            if (!fields.contains(field.name())) {
+                builder.field(field.name(), field.schema());
+            }
+        }
+        updatedSchema = builder.build();
+        return updatedSchema;
+    }

Review comment:
       Good idea. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#issuecomment-820299395


   Not sure why the build had failed, I've rekicked it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#issuecomment-725301273


   @C0urante thanks for the review, some really helpful comments there.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#issuecomment-814831822


   @mimaison I went ahead and enforce non-nullness and non-emptiness, if you want to do another pass?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#discussion_r524016349



##########
File path: connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.connect.data.Schema.STRING_SCHEMA;
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class HeaderFromTest {
+
+    private final boolean keyTransform;
+
+    static class RecordBuilder {
+        private final List<String> fields = new ArrayList<>(2);
+        private final List<Schema> fieldSchemas = new ArrayList<>(2);
+        private final List<Object> fieldValues = new ArrayList<>(2);
+        private final ConnectHeaders headers = new ConnectHeaders();
+
+        public RecordBuilder() {
+        }
+
+        public RecordBuilder withField(String name, Schema schema, Object value) {
+            fields.add(name);
+            fieldSchemas.add(schema);
+            fieldValues.add(value);
+            return this;
+        }
+
+        public RecordBuilder addHeader(String name, Schema schema, Object value) {
+            headers.add(name, new SchemaAndValue(schema, value));
+            return this;
+        }
+
+        public SourceRecord schemaless(boolean keyTransform) {
+            Map<String, Object> map = new HashMap<>();
+            for (int i = 0; i < this.fields.size(); i++) {
+                String fieldName = this.fields.get(i);
+                map.put(fieldName, this.fieldValues.get(i));
+
+            }
+            return sourceRecord(keyTransform, null, map);
+        }
+
+        private Schema schema() {
+            SchemaBuilder schemaBuilder = new SchemaBuilder(Schema.Type.STRUCT);
+            for (int i = 0; i < this.fields.size(); i++) {
+                String fieldName = this.fields.get(i);
+                schemaBuilder.field(fieldName, this.fieldSchemas.get(i));
+
+            }
+            return schemaBuilder.build();
+        }
+
+        private Struct struct(Schema schema) {
+            Struct struct = new Struct(schema);
+            for (int i = 0; i < this.fields.size(); i++) {
+                String fieldName = this.fields.get(i);
+                struct.put(fieldName, this.fieldValues.get(i));
+            }
+            return struct;
+        }
+
+        public SourceRecord withSchema(boolean keyTransform) {
+            Schema schema = schema();
+            Struct struct = struct(schema);
+            return sourceRecord(keyTransform, schema, struct);
+        }
+
+        private SourceRecord sourceRecord(boolean keyTransform, Schema keyOrValueSchema, Object keyOrValue) {
+            Map<String, ?> sourcePartition = singletonMap("foo", "bar");
+            Map<String, ?> sourceOffset = singletonMap("baz", "quxx");
+            String topic = "topic";
+            Integer partition = 0;
+            Long timestamp = 0L;
+
+            ConnectHeaders headers = this.headers;
+            if (keyOrValueSchema == null) {
+                // When doing a schemaless transformation we don't expect the header to have a schema
+                headers = new ConnectHeaders();
+                for (Header header : this.headers) {
+                    headers.add(header.key(), new SchemaAndValue(null, header.value()));
+                }
+            }
+            return new SourceRecord(sourcePartition, sourceOffset, topic, partition,
+                    keyTransform ? keyOrValueSchema : null,
+                    keyTransform ? keyOrValue : "key",
+                    !keyTransform ? keyOrValueSchema : null,
+                    !keyTransform ? keyOrValue : "value",
+                    timestamp, headers);
+        }
+
+        @Override
+        public String toString() {
+            return "RecordBuilder(" +
+                    "fields=" + fields +
+                    ", fieldSchemas=" + fieldSchemas +
+                    ", fieldValues=" + fieldValues +
+                    ", headers=" + headers +
+                    ')';
+        }
+    }
+
+    @Parameterized.Parameters(name = "{0}: testKey={1}, xformFields={3}, xformHeaders={4}, operation={5}")
+    public static Collection<Object[]> data() {
+
+        List<Object[]> result = new ArrayList<>();
+
+
+
+        for (Boolean testKeyTransform : asList(true, false)) {
+            result.add(
+                new Object[]{
+                    "basic copy",
+                    testKeyTransform,
+                    new RecordBuilder()
+                        .withField("field1", STRING_SCHEMA, "field1-value")
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("header1", STRING_SCHEMA, "existing-value"),
+                    singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.COPY,
+                    new RecordBuilder()
+                        .withField("field1", STRING_SCHEMA, "field1-value")
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("header1", STRING_SCHEMA, "existing-value")
+                        .addHeader("inserted1", STRING_SCHEMA, "field1-value")
+                });
+            result.add(
+                new Object[]{
+                    "basic move",
+                    testKeyTransform,
+                    new RecordBuilder()
+                        .withField("field1", STRING_SCHEMA, "field1-value")
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("header1", STRING_SCHEMA, "existing-value"),
+                    singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.MOVE,
+                    new RecordBuilder()
+                        // field1 got moved
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("header1", STRING_SCHEMA, "existing-value")
+                        .addHeader("inserted1", STRING_SCHEMA, "field1-value")
+                });
+            result.add(
+                new Object[]{
+                    "copy with preexisting header",
+                    testKeyTransform,
+                    new RecordBuilder()
+                        .withField("field1", STRING_SCHEMA, "field1-value")
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("inserted1", STRING_SCHEMA, "existing-value"),
+                    singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.COPY,
+                    new RecordBuilder()
+                        .withField("field1", STRING_SCHEMA, "field1-value")
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("inserted1", STRING_SCHEMA, "existing-value")
+                        .addHeader("inserted1", STRING_SCHEMA, "field1-value")
+                });
+            result.add(
+                new Object[]{
+                    "move with preexisting header",
+                    testKeyTransform,
+                    new RecordBuilder()
+                        .withField("field1", STRING_SCHEMA, "field1-value")
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("inserted1", STRING_SCHEMA, "existing-value"),
+                    singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.MOVE,
+                    new RecordBuilder()
+                        // field1 got moved
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("inserted1", STRING_SCHEMA, "existing-value")
+                        .addHeader("inserted1", STRING_SCHEMA, "field1-value")
+                });
+            Schema schema = new SchemaBuilder(Schema.Type.STRUCT).field("foo", STRING_SCHEMA).build();
+            Struct struct = new Struct(schema).put("foo", "foo-value");
+            result.add(
+                new Object[]{
+                    "copy with struct value",
+                    testKeyTransform,
+                    new RecordBuilder()
+                        .withField("field1", schema, struct)
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("header1", STRING_SCHEMA, "existing-value"),
+                    singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.COPY,
+                    new RecordBuilder()
+                        .withField("field1", schema, struct)
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("header1", STRING_SCHEMA, "existing-value")
+                        .addHeader("inserted1", schema, struct)
+                });
+            result.add(
+                new Object[]{
+                    "move with struct value",
+                    testKeyTransform,
+                    new RecordBuilder()
+                        .withField("field1", schema, struct)
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("header1", STRING_SCHEMA, "existing-value"),
+                    singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.MOVE,
+                    new RecordBuilder()
+                        // field1 got moved
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("header1", STRING_SCHEMA, "existing-value")
+                        .addHeader("inserted1", schema, struct)
+                });
+            result.add(
+                new Object[]{
+                    "two headers from same field",
+                    testKeyTransform,
+                    new RecordBuilder()
+                        .withField("field1", STRING_SCHEMA, "field1-value")
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("header1", STRING_SCHEMA, "existing-value"),
+                    // two headers from the same field
+                    asList("field1", "field1"), asList("inserted1", "inserted2"), HeaderFrom.Operation.MOVE,
+                    new RecordBuilder()
+                        // field1 got moved
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("header1", STRING_SCHEMA, "existing-value")
+                        .addHeader("inserted1", STRING_SCHEMA, "field1-value")
+                        .addHeader("inserted2", STRING_SCHEMA, "field1-value")
+                });
+            result.add(
+                new Object[]{
+                    "two fields to same header",
+                    testKeyTransform,
+                    new RecordBuilder()
+                        .withField("field1", STRING_SCHEMA, "field1-value")
+                        .withField("field2", STRING_SCHEMA, "field2-value")
+                        .addHeader("header1", STRING_SCHEMA, "existing-value"),
+                    // two headers from the same field
+                    asList("field1", "field2"), asList("inserted1", "inserted1"), HeaderFrom.Operation.MOVE,
+                    new RecordBuilder()
+                        // field1 and field2 got moved
+                        .addHeader("header1", STRING_SCHEMA, "existing-value")
+                        .addHeader("inserted1", STRING_SCHEMA, "field1-value")
+                        .addHeader("inserted1", STRING_SCHEMA, "field2-value")
+                });
+        }
+        return result;
+    }
+
+    private final HeaderFrom<SourceRecord> xform;
+
+    private final RecordBuilder originalRecordBuilder;
+    private final RecordBuilder expectedRecordBuilder;
+    private final List<String> transformFields;
+    private final List<String> headers;
+    private final HeaderFrom.Operation operation;
+
+    public HeaderFromTest(String description,
+                          boolean keyTransform,
+                          RecordBuilder originalBuilder,
+                          List<String> transformFields, List<String> headers, HeaderFrom.Operation operation,
+                          RecordBuilder expectedBuilder) {
+        this.keyTransform = keyTransform;
+        this.xform = keyTransform ? new HeaderFrom.Key<>() : new HeaderFrom.Value<>();
+        this.originalRecordBuilder = originalBuilder;
+        this.expectedRecordBuilder = expectedBuilder;
+        this.transformFields = transformFields;
+        this.headers = headers;
+        this.operation = operation;
+    }
+
+    private Map<String, Object> config() {
+        Map<String, Object> result = new HashMap<>();
+        result.put(HeaderFrom.HEADERS_FIELD, headers);
+        result.put(HeaderFrom.FIELDS_FIELD, transformFields);
+        result.put(HeaderFrom.OPERATION_FIELD, operation.toString());
+        return result;
+    }
+
+    @Test
+    public void schemaless() {
+        xform.configure(config());
+        ConnectHeaders headers = new ConnectHeaders();
+        headers.addString("existing", "existing-value");
+
+        SourceRecord originalRecord = originalRecordBuilder.schemaless(keyTransform);
+        SourceRecord expectedRecord = expectedRecordBuilder.schemaless(keyTransform);
+        SourceRecord xformed = xform.apply(originalRecord);
+        assertSameRecord(expectedRecord, xformed);
+    }
+
+    @Test
+    public void withSchema() {
+        xform.configure(config());
+        ConnectHeaders headers = new ConnectHeaders();
+        headers.addString("existing", "existing-value");
+        Headers expect = headers.duplicate();
+        for (int i = 0; i < this.headers.size(); i++) {
+            expect.add(this.headers.get(i), originalRecordBuilder.fieldValues.get(i), originalRecordBuilder.fieldSchemas.get(i));
+        }
+
+        SourceRecord originalRecord = originalRecordBuilder.withSchema(keyTransform);
+        SourceRecord expectedRecord = expectedRecordBuilder.withSchema(keyTransform);
+        SourceRecord xformed = xform.apply(originalRecord);
+        assertSameRecord(expectedRecord, xformed);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void invalidConfig() {
+        Map<String, Object> config = config();
+        List<String> headers = new ArrayList<>(this.headers);
+        headers.add("unexpected");
+        config.put(HeaderFrom.HEADERS_FIELD, headers);
+        xform.configure(config);
+    }

Review comment:
       Yeah, parameterized tests do have their faults. AFAIK the only way to handle this using Junit 4 is to have two tests. You can arrange to have both tests in the same source file so both parameterised and non-parameterised tests are still easily discoverable (e.g. [using Enclosed](https://stackoverflow.com/questions/3361239/excluding-a-non-param-test-in-parameterized-test-class)), but I've never seen that done in practice and it looks rather fiddly. Since these tests execute pretty quickly, I'm inclined to think we should just live with it. The real solution might just be to adopt Junit 5.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#discussion_r613913901



##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.NonEmptyListValidator;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.lang.String.format;
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String FIELDS_FIELD = "fields";
+    public static final String HEADERS_FIELD = "headers";
+    public static final String OPERATION_FIELD = "operation";
+    private static final String MOVE_OPERATION = "move";
+    private static final String COPY_OPERATION = "copy";
+
+    public static final String OVERVIEW_DOC =
+            "Moves or copies fields in the key/value of a record into that record's headers. " +
+                    "Corresponding elements of <code>" + FIELDS_FIELD + "</code> and " +
+                    "<code>" + HEADERS_FIELD + "</code> together identify a field and the header it should be " +
+                    "moved or copied to. " +
+                    "Use the concrete transformation type designed for the record " +
+                    "key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELDS_FIELD, ConfigDef.Type.LIST,
+                    NO_DEFAULT_VALUE, new NonEmptyListValidator(),
+                    ConfigDef.Importance.HIGH,
+                    "Field names in the record whose values are to be copied or moved to headers.")
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST,
+                    NO_DEFAULT_VALUE, new NonEmptyListValidator(),
+                    ConfigDef.Importance.HIGH,
+                    "Header names, in the same order as the field names listed in the fields configuration property.")
+            .define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+                    ConfigDef.ValidString.in(MOVE_OPERATION, COPY_OPERATION), ConfigDef.Importance.HIGH,
+                    "Either <code>move</code> if the fields are to be moved to the headers (removed from the key/value), " +
+                            "or <code>copy</code> if the fields are to be copied to the headers (retained in the key/value).");
+
+    enum Operation {
+        MOVE(MOVE_OPERATION),
+        COPY(COPY_OPERATION);
+
+        private final String name;
+
+        Operation(String name) {
+            this.name = name;
+        }
+
+        static Operation fromName(String name) {
+            switch (name) {
+                case MOVE_OPERATION:
+                    return MOVE;
+                case COPY_OPERATION:
+                    return COPY;
+                default:
+                    throw new IllegalArgumentException();

Review comment:
       Even though I don't think it's reachable by users, should we have a message here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#discussion_r588262888



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
##########
@@ -18,11 +18,15 @@
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.transforms.Cast;
+import org.apache.kafka.connect.transforms.DropHeaders;
 import org.apache.kafka.connect.transforms.ExtractField;
 import org.apache.kafka.connect.transforms.Filter;
 import org.apache.kafka.connect.transforms.Flatten;
+import org.apache.kafka.connect.transforms.HeaderFrom;
+import org.apache.kafka.connect.transforms.HeaderTo;

Review comment:
       We don't have this transformation!

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.List;
+import java.util.Map;
+
+public class DropHeaders<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String OVERVIEW_DOC =
+            "Removes one or more headers from each record.";
+
+    public static final String HEADERS_FIELD = "headers";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "The name of the headers to be removed.");
+
+    private List<String> headers;
+
+    @Override
+    public R apply(R record) {
+        Headers updatedHeaders = record.headers().duplicate();
+        for (String name : headers) {
+            updatedHeaders.remove(name);

Review comment:
       Because `Headers` is a `LinkedList`, `remove()` has to iterate the whole list each time. I wonder if we could instead start from an empty headers list and add the headers not being removed?

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;

Review comment:
       Unused import

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.List;
+import java.util.Map;
+
+public class DropHeaders<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String OVERVIEW_DOC =
+            "Removes one or more headers from each record.";
+
+    public static final String HEADERS_FIELD = "headers";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "The name of the headers to be removed.");
+
+    private List<String> headers;
+
+    @Override
+    public R apply(R record) {
+        Headers updatedHeaders = record.headers().duplicate();
+        for (String name : headers) {
+            updatedHeaders.remove(name);
+        }
+        return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
+                record.valueSchema(), record.value(), record.timestamp(), updatedHeaders);
+    }
+

Review comment:
       nit, extra line

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.List;
+import java.util.Map;
+
+public class DropHeaders<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String OVERVIEW_DOC =
+            "Removes one or more headers from each record.";
+
+    public static final String HEADERS_FIELD = "headers";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "The name of the headers to be removed.");
+
+    private List<String> headers;
+
+    @Override
+    public R apply(R record) {
+        Headers updatedHeaders = record.headers().duplicate();
+        for (String name : headers) {
+            updatedHeaders.remove(name);
+        }
+        return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
+                record.valueSchema(), record.value(), record.timestamp(), updatedHeaders);
+    }
+
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+

Review comment:
       nit, extra line

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.lang.String.format;
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String FIELDS_FIELD = "fields";
+    public static final String HEADERS_FIELD = "headers";
+    public static final String OPERATION_FIELD = "operation";
+    private static final String MOVE_OPERATION = "move";
+    private static final String COPY_OPERATION = "copy";
+
+    public static final String OVERVIEW_DOC =
+            "Moves or copies fields in the key/value of a record into that record's headers. " +
+                    "Corresponding elements of <code>" + FIELDS_FIELD + "</code> and " +
+                    "<code>" + HEADERS_FIELD + "</code> together identify a field and the header it should be " +
+                    "moved or copied to. " +
+                    "Use the concrete transformation type designed for the record " +
+                    "key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELDS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Field names in the record whose values are to be copied or moved to headers.")
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Header names, in the same order as the field names listed in the fields configuration property.")
+            .define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+                    ConfigDef.ValidString.in("move", "copy"), ConfigDef.Importance.HIGH,

Review comment:
       Should we reuse `MOVE_OPERATION` and `COPY_OPERATION` here?

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.Values;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.Map;
+
+public class InsertHeader<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String OVERVIEW_DOC =
+            "Add a header to each record.";
+
+    public static final String HEADER_FIELD = "header";
+    public static final String VALUE_LITERAL_FIELD = "value.literal";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(HEADER_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
+                    "The name of the header.")
+            .define(VALUE_LITERAL_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,

Review comment:
       Should we enforce these fields are not null?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#discussion_r608546217



##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.Values;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.Map;
+
+public class InsertHeader<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String OVERVIEW_DOC =
+            "Add a header to each record.";
+
+    public static final String HEADER_FIELD = "header";
+    public static final String VALUE_LITERAL_FIELD = "value.literal";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(HEADER_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
+                    "The name of the header.")
+            .define(VALUE_LITERAL_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,

Review comment:
       I guess that makes sense @mimaison, but then for consistency shouldn't we make `DropHeaders.headers`, `HeaderFrom.headers` and `HeaderFrom.fields` reject empty lists?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#discussion_r520056667



##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String FIELDS_FIELD = "fields";
+    public static final String HEADERS_FIELD = "headers";
+    public static final String OPERATION_FIELD = "operation";
+
+    public static final String OVERVIEW_DOC =
+            "Moves or copies fields in the key/value of a record into that record's headers. " +
+                    "Corresponding elements of <code>" + FIELDS_FIELD + "</code> and " +
+                    "<code>" + HEADERS_FIELD + "</code> together identify a field and the header it should be " +
+                    "moved or copied to. " +
+                    "Use the concrete transformation type designed for the record " +
+                    "key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELDS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Field names in the record whose values are to be copied or moved to headers.")
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Header names, in the same order as the field names listed in the fields configuration property.")
+            .define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+                    ConfigDef.ValidString.in("move", "copy"), ConfigDef.Importance.HIGH,
+                    "Either <code>move</code> if the fields are to be moved to the headers (removed from the key/value), " +
+                            "or <code>copy</code> if the fields are to be copied to the headers (retained in the key/value).");
+
+    enum Operation {
+        MOVE("move"),
+        COPY("copy");
+
+        private final String name;
+
+        Operation(String name) {
+            this.name = name;
+        }
+
+        static Operation fromName(String name) {
+            switch (name) {
+                case "move":
+                    return MOVE;
+                case "copy":
+                    return COPY;
+                default:
+                    throw new IllegalArgumentException();
+            }
+        }
+
+        public String toString() {
+            return name;
+        }
+    }
+
+    private List<String> fields;
+
+    private List<String> headers;
+
+    private Operation operation;
+
+    @Override
+    public R apply(R record) {
+        Object operatingValue = operatingValue(record);
+        Schema operatingSchema = operatingSchema(record);
+
+        if (operatingSchema == null) {
+            return applySchemaless(record, operatingValue);
+        } else {
+            return applyWithSchema(record, operatingValue, operatingSchema);
+        }
+    }
+
+    private R applyWithSchema(R record, Object operatingValue, Schema operatingSchema) {
+        Headers updatedHeaders = record.headers().duplicate();

Review comment:
       Why duplicate headers here? According to the [Header class's Javadocs](https://kafka.apache.org/26/javadoc/org/apache/kafka/connect/header/Headers.html), the collection should be mutable.

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String FIELDS_FIELD = "fields";
+    public static final String HEADERS_FIELD = "headers";
+    public static final String OPERATION_FIELD = "operation";
+
+    public static final String OVERVIEW_DOC =
+            "Moves or copies fields in the key/value of a record into that record's headers. " +
+                    "Corresponding elements of <code>" + FIELDS_FIELD + "</code> and " +
+                    "<code>" + HEADERS_FIELD + "</code> together identify a field and the header it should be " +
+                    "moved or copied to. " +
+                    "Use the concrete transformation type designed for the record " +
+                    "key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELDS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Field names in the record whose values are to be copied or moved to headers.")
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Header names, in the same order as the field names listed in the fields configuration property.")
+            .define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+                    ConfigDef.ValidString.in("move", "copy"), ConfigDef.Importance.HIGH,
+                    "Either <code>move</code> if the fields are to be moved to the headers (removed from the key/value), " +
+                            "or <code>copy</code> if the fields are to be copied to the headers (retained in the key/value).");

Review comment:
       Might make sense to refer to the `MOVE.name` and `COPY.name` fields declared in the `Operation` enum below instead of using the literal `"move"` and `"copy"` strings in this section.

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.List;
+import java.util.Map;
+
+public class DropHeaders<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String OVERVIEW_DOC =
+            "Removes one or more headers from each record.";
+
+    public static final String HEADERS_FIELD = "header.names";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "The name of the headers to be removed.");
+
+    private List<String> headers;
+
+    @Override
+    public R apply(R record) {
+        Headers updatedHeaders = record.headers().duplicate();

Review comment:
       Same question here RE: duplication

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.Map;
+
+public class InsertHeader<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String OVERVIEW_DOC =
+            "Add a header to each record.";
+
+    public static final String HEADER_FIELD = "header";
+    public static final String VALUE_LITERAL_FIELD = "value.literal";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(HEADER_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
+                    "The name of the header.")
+            .define(VALUE_LITERAL_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
+                    "The literal value that is to be set as the header value on all records.");
+
+    private String header;
+
+    private String literalValue;

Review comment:
       I think this is supposed to be parsed using the `Values` class, which would allow users to specify integral, floating point, boolean, and even structured and nested types here that would then be correctly picked up by the framework and used in the resulting header.

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String FIELDS_FIELD = "fields";
+    public static final String HEADERS_FIELD = "headers";
+    public static final String OPERATION_FIELD = "operation";
+
+    public static final String OVERVIEW_DOC =
+            "Moves or copies fields in the key/value of a record into that record's headers. " +
+                    "Corresponding elements of <code>" + FIELDS_FIELD + "</code> and " +
+                    "<code>" + HEADERS_FIELD + "</code> together identify a field and the header it should be " +
+                    "moved or copied to. " +
+                    "Use the concrete transformation type designed for the record " +
+                    "key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELDS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Field names in the record whose values are to be copied or moved to headers.")
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Header names, in the same order as the field names listed in the fields configuration property.")
+            .define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+                    ConfigDef.ValidString.in("move", "copy"), ConfigDef.Importance.HIGH,
+                    "Either <code>move</code> if the fields are to be moved to the headers (removed from the key/value), " +
+                            "or <code>copy</code> if the fields are to be copied to the headers (retained in the key/value).");
+
+    enum Operation {
+        MOVE("move"),
+        COPY("copy");
+
+        private final String name;
+
+        Operation(String name) {
+            this.name = name;
+        }
+
+        static Operation fromName(String name) {
+            switch (name) {
+                case "move":
+                    return MOVE;
+                case "copy":
+                    return COPY;
+                default:
+                    throw new IllegalArgumentException();
+            }
+        }
+
+        public String toString() {
+            return name;
+        }
+    }
+
+    private List<String> fields;
+
+    private List<String> headers;
+
+    private Operation operation;
+
+    @Override
+    public R apply(R record) {
+        Object operatingValue = operatingValue(record);
+        Schema operatingSchema = operatingSchema(record);
+
+        if (operatingSchema == null) {
+            return applySchemaless(record, operatingValue);
+        } else {
+            return applyWithSchema(record, operatingValue, operatingSchema);
+        }
+    }
+
+    private R applyWithSchema(R record, Object operatingValue, Schema operatingSchema) {
+        Headers updatedHeaders = record.headers().duplicate();
+        Struct value = Requirements.requireStruct(operatingValue, "header " + operation);
+        final Schema updatedSchema;
+        if (operation == Operation.MOVE) {
+            updatedSchema = moveSchema(operatingSchema);
+        } else {
+            updatedSchema = operatingSchema;
+        }
+        final Struct updatedValue = new Struct(updatedSchema);
+        for (Field field : updatedSchema.fields()) {
+            updatedValue.put(field, value.get(field.name()));
+        }
+        for (int i = 0; i < fields.size(); i++) {
+            String fieldName = fields.get(i);
+            String headerName = headers.get(i);
+            Object fieldValue = value.get(fieldName);

Review comment:
       Worth noting that if there's no field with this name in the schema for the record, this will throw an exception and (unless it's configured with an error tolerance to ignore such issues), fail the task.
   
   This isn't unreasonable behavior, especially given that that case isn't covered in the KIP, but it's different from what happens right now in the `applySchemaless` method, which is to silently add a null header instead.
   
   It's probably best to ensure the same behavior happens in either case. I don't have a strong preference either way but I'm tentatively leaning towards silently adding `null` headers since it'll make working with heterogeneous topics easier.

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.List;
+import java.util.Map;
+
+public class DropHeaders<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String OVERVIEW_DOC =
+            "Removes one or more headers from each record.";
+
+    public static final String HEADERS_FIELD = "header.names";

Review comment:
       Looks like this has two different names in the KIP; it's referenced as `headers` in the textual description but as `header.names` in the sample config.
   
   I think `headers` makes more sense here given that it would align with the properties defined in the `InsertHeader` and `HeaderFrom` transformations.

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String FIELDS_FIELD = "fields";
+    public static final String HEADERS_FIELD = "headers";
+    public static final String OPERATION_FIELD = "operation";
+
+    public static final String OVERVIEW_DOC =
+            "Moves or copies fields in the key/value of a record into that record's headers. " +
+                    "Corresponding elements of <code>" + FIELDS_FIELD + "</code> and " +
+                    "<code>" + HEADERS_FIELD + "</code> together identify a field and the header it should be " +
+                    "moved or copied to. " +
+                    "Use the concrete transformation type designed for the record " +
+                    "key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELDS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Field names in the record whose values are to be copied or moved to headers.")
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Header names, in the same order as the field names listed in the fields configuration property.")
+            .define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+                    ConfigDef.ValidString.in("move", "copy"), ConfigDef.Importance.HIGH,
+                    "Either <code>move</code> if the fields are to be moved to the headers (removed from the key/value), " +
+                            "or <code>copy</code> if the fields are to be copied to the headers (retained in the key/value).");
+
+    enum Operation {
+        MOVE("move"),
+        COPY("copy");
+
+        private final String name;
+
+        Operation(String name) {
+            this.name = name;
+        }
+
+        static Operation fromName(String name) {
+            switch (name) {
+                case "move":
+                    return MOVE;
+                case "copy":
+                    return COPY;
+                default:
+                    throw new IllegalArgumentException();
+            }
+        }
+
+        public String toString() {
+            return name;
+        }
+    }
+
+    private List<String> fields;
+
+    private List<String> headers;
+
+    private Operation operation;
+
+    @Override
+    public R apply(R record) {
+        Object operatingValue = operatingValue(record);
+        Schema operatingSchema = operatingSchema(record);
+
+        if (operatingSchema == null) {
+            return applySchemaless(record, operatingValue);
+        } else {
+            return applyWithSchema(record, operatingValue, operatingSchema);
+        }
+    }
+
+    private R applyWithSchema(R record, Object operatingValue, Schema operatingSchema) {
+        Headers updatedHeaders = record.headers().duplicate();
+        Struct value = Requirements.requireStruct(operatingValue, "header " + operation);
+        final Schema updatedSchema;
+        if (operation == Operation.MOVE) {
+            updatedSchema = moveSchema(operatingSchema);
+        } else {
+            updatedSchema = operatingSchema;
+        }
+        final Struct updatedValue = new Struct(updatedSchema);
+        for (Field field : updatedSchema.fields()) {
+            updatedValue.put(field, value.get(field.name()));
+        }
+        for (int i = 0; i < fields.size(); i++) {
+            String fieldName = fields.get(i);
+            String headerName = headers.get(i);
+            Object fieldValue = value.get(fieldName);
+            Schema fieldSchema = operatingSchema.field(fieldName).schema();
+            updatedHeaders.add(headerName, fieldValue, fieldSchema);
+        }
+        return newRecord(record, updatedSchema, updatedValue, updatedHeaders);
+    }
+
+    private Schema moveSchema(Schema operatingSchema) {
+        final Schema updatedSchema;
+        final SchemaBuilder builder = SchemaUtil.copySchemaBasics(operatingSchema, SchemaBuilder.struct());
+        for (Field field : operatingSchema.fields()) {
+            if (!fields.contains(field.name())) {
+                builder.field(field.name(), field.schema());
+            }
+        }
+        updatedSchema = builder.build();
+        return updatedSchema;
+    }
+
+    private R applySchemaless(R record, Object operatingValue) {
+        Headers updatedHeaders = record.headers().duplicate();
+        Map<String, Object> value = Requirements.requireMap(operatingValue, "header " + operation);
+        Map<String, Object> updatedValue = new HashMap<>(value);
+        for (int i = 0; i < fields.size(); i++) {
+            String fieldName = fields.get(i);
+            Object fieldValue = value.get(fieldName);
+            String headerName = headers.get(i);
+            if (operation == Operation.MOVE) {
+                updatedValue.remove(fieldName);
+            }
+            updatedHeaders.add(headerName, fieldValue, null);
+        }
+        return newRecord(record, null, updatedValue, updatedHeaders);
+    }
+
+    protected abstract Object operatingValue(R record);
+    protected abstract Schema operatingSchema(R record);
+    protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue, Iterable<Header> updatedHeaders);
+
+    public static class Key<R extends ConnectRecord<R>> extends HeaderFrom<R> {
+
+        @Override
+        public Object operatingValue(R record) {
+            return record.key();
+        }
+
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.keySchema();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema, Object updatedValue, Iterable<Header> updatedHeaders) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue,
+                    record.valueSchema(), record.value(), record.timestamp(), updatedHeaders);
+        }
+    }
+
+    public static class Value<R extends ConnectRecord<R>> extends HeaderFrom<R> {
+
+        @Override
+        public Object operatingValue(R record) {
+            return record.value();
+        }
+
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.valueSchema();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema, Object updatedValue, Iterable<Header> updatedHeaders) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
+                    updatedSchema, updatedValue, record.timestamp(), updatedHeaders);
+        }
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> props) {
+        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
+        fields = config.getList(FIELDS_FIELD);
+        headers = config.getList(HEADERS_FIELD);
+        if (headers.size() != fields.size()) {
+            throw new ConfigException("'fields' config must have the same number of elements as 'headers' config.");

Review comment:
       Probably want to refer to `FIELDS_FIELD` and `HEADERS_FIELD` here instead of the literal `"fields"` and `"headers"` strings.

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String FIELDS_FIELD = "fields";
+    public static final String HEADERS_FIELD = "headers";
+    public static final String OPERATION_FIELD = "operation";
+
+    public static final String OVERVIEW_DOC =
+            "Moves or copies fields in the key/value of a record into that record's headers. " +
+                    "Corresponding elements of <code>" + FIELDS_FIELD + "</code> and " +
+                    "<code>" + HEADERS_FIELD + "</code> together identify a field and the header it should be " +
+                    "moved or copied to. " +
+                    "Use the concrete transformation type designed for the record " +
+                    "key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELDS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Field names in the record whose values are to be copied or moved to headers.")
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Header names, in the same order as the field names listed in the fields configuration property.")
+            .define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+                    ConfigDef.ValidString.in("move", "copy"), ConfigDef.Importance.HIGH,
+                    "Either <code>move</code> if the fields are to be moved to the headers (removed from the key/value), " +
+                            "or <code>copy</code> if the fields are to be copied to the headers (retained in the key/value).");
+
+    enum Operation {
+        MOVE("move"),
+        COPY("copy");
+
+        private final String name;
+
+        Operation(String name) {
+            this.name = name;
+        }
+
+        static Operation fromName(String name) {
+            switch (name) {
+                case "move":
+                    return MOVE;
+                case "copy":
+                    return COPY;
+                default:
+                    throw new IllegalArgumentException();
+            }
+        }
+
+        public String toString() {
+            return name;
+        }
+    }
+
+    private List<String> fields;
+
+    private List<String> headers;
+
+    private Operation operation;
+
+    @Override
+    public R apply(R record) {
+        Object operatingValue = operatingValue(record);
+        Schema operatingSchema = operatingSchema(record);
+
+        if (operatingSchema == null) {
+            return applySchemaless(record, operatingValue);
+        } else {
+            return applyWithSchema(record, operatingValue, operatingSchema);
+        }
+    }
+
+    private R applyWithSchema(R record, Object operatingValue, Schema operatingSchema) {
+        Headers updatedHeaders = record.headers().duplicate();
+        Struct value = Requirements.requireStruct(operatingValue, "header " + operation);
+        final Schema updatedSchema;
+        if (operation == Operation.MOVE) {
+            updatedSchema = moveSchema(operatingSchema);
+        } else {
+            updatedSchema = operatingSchema;
+        }
+        final Struct updatedValue = new Struct(updatedSchema);
+        for (Field field : updatedSchema.fields()) {
+            updatedValue.put(field, value.get(field.name()));
+        }
+        for (int i = 0; i < fields.size(); i++) {
+            String fieldName = fields.get(i);
+            String headerName = headers.get(i);
+            Object fieldValue = value.get(fieldName);
+            Schema fieldSchema = operatingSchema.field(fieldName).schema();
+            updatedHeaders.add(headerName, fieldValue, fieldSchema);
+        }
+        return newRecord(record, updatedSchema, updatedValue, updatedHeaders);
+    }
+
+    private Schema moveSchema(Schema operatingSchema) {
+        final Schema updatedSchema;
+        final SchemaBuilder builder = SchemaUtil.copySchemaBasics(operatingSchema, SchemaBuilder.struct());
+        for (Field field : operatingSchema.fields()) {
+            if (!fields.contains(field.name())) {
+                builder.field(field.name(), field.schema());
+            }
+        }
+        updatedSchema = builder.build();
+        return updatedSchema;
+    }

Review comment:
       This looks fairly expensive to perform for every record. Do you think it might make sense to perform some caching, similarly to what's done in the [ReplaceField](https://github.com/apache/kafka/blob/d61dc0c1832935ae680388dcb8c12d1250dece33/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java#L169-L173) transform?

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String FIELDS_FIELD = "fields";
+    public static final String HEADERS_FIELD = "headers";
+    public static final String OPERATION_FIELD = "operation";
+
+    public static final String OVERVIEW_DOC =
+            "Moves or copies fields in the key/value of a record into that record's headers. " +
+                    "Corresponding elements of <code>" + FIELDS_FIELD + "</code> and " +
+                    "<code>" + HEADERS_FIELD + "</code> together identify a field and the header it should be " +
+                    "moved or copied to. " +
+                    "Use the concrete transformation type designed for the record " +
+                    "key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELDS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Field names in the record whose values are to be copied or moved to headers.")
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Header names, in the same order as the field names listed in the fields configuration property.")
+            .define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+                    ConfigDef.ValidString.in("move", "copy"), ConfigDef.Importance.HIGH,
+                    "Either <code>move</code> if the fields are to be moved to the headers (removed from the key/value), " +
+                            "or <code>copy</code> if the fields are to be copied to the headers (retained in the key/value).");
+
+    enum Operation {
+        MOVE("move"),
+        COPY("copy");
+
+        private final String name;
+
+        Operation(String name) {
+            this.name = name;
+        }
+
+        static Operation fromName(String name) {
+            switch (name) {
+                case "move":
+                    return MOVE;
+                case "copy":
+                    return COPY;
+                default:
+                    throw new IllegalArgumentException();
+            }
+        }
+
+        public String toString() {
+            return name;
+        }
+    }
+
+    private List<String> fields;
+
+    private List<String> headers;
+
+    private Operation operation;
+
+    @Override
+    public R apply(R record) {
+        Object operatingValue = operatingValue(record);
+        Schema operatingSchema = operatingSchema(record);
+
+        if (operatingSchema == null) {
+            return applySchemaless(record, operatingValue);
+        } else {
+            return applyWithSchema(record, operatingValue, operatingSchema);
+        }
+    }
+
+    private R applyWithSchema(R record, Object operatingValue, Schema operatingSchema) {
+        Headers updatedHeaders = record.headers().duplicate();
+        Struct value = Requirements.requireStruct(operatingValue, "header " + operation);
+        final Schema updatedSchema;
+        if (operation == Operation.MOVE) {
+            updatedSchema = moveSchema(operatingSchema);
+        } else {
+            updatedSchema = operatingSchema;
+        }
+        final Struct updatedValue = new Struct(updatedSchema);
+        for (Field field : updatedSchema.fields()) {
+            updatedValue.put(field, value.get(field.name()));
+        }

Review comment:
       Does this mean we'd be iterating over every field of every record? Not very performant 🙁 
   
   I think we can improve the efficiency here for the `COPY` case, if not the `MOVE` case, by reusing the existing value and not creating a new one.
   
   In the `MOVE` case though, it might be the best we have to settle for until/unless there's a richer API added for mutating Connect records and their keys, values, schemas, etc.

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.Map;
+
+public class InsertHeader<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String OVERVIEW_DOC =
+            "Add a header to each record.";
+
+    public static final String HEADER_FIELD = "header";
+    public static final String VALUE_LITERAL_FIELD = "value.literal";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(HEADER_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
+                    "The name of the header.")
+            .define(VALUE_LITERAL_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
+                    "The literal value that is to be set as the header value on all records.");
+
+    private String header;
+
+    private String literalValue;
+
+    @Override
+    public R apply(R record) {
+        Headers updatedHeaders = record.headers().duplicate();
+        updatedHeaders.add(header, literalValue, Schema.STRING_SCHEMA);

Review comment:
       If we use the `Values` class to parse the value literal, we can use the schema that it provides instead of hardcoding this to use a string schema.

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.Map;
+
+public class InsertHeader<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String OVERVIEW_DOC =
+            "Add a header to each record.";
+
+    public static final String HEADER_FIELD = "header";
+    public static final String VALUE_LITERAL_FIELD = "value.literal";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(HEADER_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
+                    "The name of the header.")
+            .define(VALUE_LITERAL_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
+                    "The literal value that is to be set as the header value on all records.");
+
+    private String header;
+
+    private String literalValue;
+
+    @Override
+    public R apply(R record) {
+        Headers updatedHeaders = record.headers().duplicate();

Review comment:
       Same question here RE: duplication




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#discussion_r523147909



##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String FIELDS_FIELD = "fields";
+    public static final String HEADERS_FIELD = "headers";
+    public static final String OPERATION_FIELD = "operation";
+
+    public static final String OVERVIEW_DOC =
+            "Moves or copies fields in the key/value of a record into that record's headers. " +
+                    "Corresponding elements of <code>" + FIELDS_FIELD + "</code> and " +
+                    "<code>" + HEADERS_FIELD + "</code> together identify a field and the header it should be " +
+                    "moved or copied to. " +
+                    "Use the concrete transformation type designed for the record " +
+                    "key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELDS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Field names in the record whose values are to be copied or moved to headers.")
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Header names, in the same order as the field names listed in the fields configuration property.")
+            .define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+                    ConfigDef.ValidString.in("move", "copy"), ConfigDef.Importance.HIGH,
+                    "Either <code>move</code> if the fields are to be moved to the headers (removed from the key/value), " +
+                            "or <code>copy</code> if the fields are to be copied to the headers (retained in the key/value).");
+
+    enum Operation {
+        MOVE("move"),
+        COPY("copy");
+
+        private final String name;
+
+        Operation(String name) {
+            this.name = name;
+        }
+
+        static Operation fromName(String name) {
+            switch (name) {
+                case "move":
+                    return MOVE;
+                case "copy":
+                    return COPY;
+                default:
+                    throw new IllegalArgumentException();
+            }
+        }
+
+        public String toString() {
+            return name;
+        }
+    }
+
+    private List<String> fields;
+
+    private List<String> headers;
+
+    private Operation operation;
+
+    @Override
+    public R apply(R record) {
+        Object operatingValue = operatingValue(record);
+        Schema operatingSchema = operatingSchema(record);
+
+        if (operatingSchema == null) {
+            return applySchemaless(record, operatingValue);
+        } else {
+            return applyWithSchema(record, operatingValue, operatingSchema);
+        }
+    }
+
+    private R applyWithSchema(R record, Object operatingValue, Schema operatingSchema) {
+        Headers updatedHeaders = record.headers().duplicate();

Review comment:
       Ah, that's fair. Does seem to be the pattern followed by the other out-of-the-box transformations as well; probably best to continue to follow that pattern.
   
   I'm a little unnerved by this though, since as far as I can tell it's not publicly documented and so it's possible people writing their own transformations may be violating this implicit rule.
   
   Out of scope, so I've filed [KAFKA-10720](https://issues.apache.org/jira/browse/KAFKA-10720) to track the need for possible documentation improvements.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org