You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/09 19:48:25 UTC

[GitHub] [kafka] C0urante commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

C0urante commented on a change in pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#discussion_r520056667



##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String FIELDS_FIELD = "fields";
+    public static final String HEADERS_FIELD = "headers";
+    public static final String OPERATION_FIELD = "operation";
+
+    public static final String OVERVIEW_DOC =
+            "Moves or copies fields in the key/value of a record into that record's headers. " +
+                    "Corresponding elements of <code>" + FIELDS_FIELD + "</code> and " +
+                    "<code>" + HEADERS_FIELD + "</code> together identify a field and the header it should be " +
+                    "moved or copied to. " +
+                    "Use the concrete transformation type designed for the record " +
+                    "key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELDS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Field names in the record whose values are to be copied or moved to headers.")
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Header names, in the same order as the field names listed in the fields configuration property.")
+            .define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+                    ConfigDef.ValidString.in("move", "copy"), ConfigDef.Importance.HIGH,
+                    "Either <code>move</code> if the fields are to be moved to the headers (removed from the key/value), " +
+                            "or <code>copy</code> if the fields are to be copied to the headers (retained in the key/value).");
+
+    enum Operation {
+        MOVE("move"),
+        COPY("copy");
+
+        private final String name;
+
+        Operation(String name) {
+            this.name = name;
+        }
+
+        static Operation fromName(String name) {
+            switch (name) {
+                case "move":
+                    return MOVE;
+                case "copy":
+                    return COPY;
+                default:
+                    throw new IllegalArgumentException();
+            }
+        }
+
+        public String toString() {
+            return name;
+        }
+    }
+
+    private List<String> fields;
+
+    private List<String> headers;
+
+    private Operation operation;
+
+    @Override
+    public R apply(R record) {
+        Object operatingValue = operatingValue(record);
+        Schema operatingSchema = operatingSchema(record);
+
+        if (operatingSchema == null) {
+            return applySchemaless(record, operatingValue);
+        } else {
+            return applyWithSchema(record, operatingValue, operatingSchema);
+        }
+    }
+
+    private R applyWithSchema(R record, Object operatingValue, Schema operatingSchema) {
+        Headers updatedHeaders = record.headers().duplicate();

Review comment:
       Why duplicate headers here? According to the [Header class's Javadocs](https://kafka.apache.org/26/javadoc/org/apache/kafka/connect/header/Headers.html), the collection should be mutable.

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String FIELDS_FIELD = "fields";
+    public static final String HEADERS_FIELD = "headers";
+    public static final String OPERATION_FIELD = "operation";
+
+    public static final String OVERVIEW_DOC =
+            "Moves or copies fields in the key/value of a record into that record's headers. " +
+                    "Corresponding elements of <code>" + FIELDS_FIELD + "</code> and " +
+                    "<code>" + HEADERS_FIELD + "</code> together identify a field and the header it should be " +
+                    "moved or copied to. " +
+                    "Use the concrete transformation type designed for the record " +
+                    "key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELDS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Field names in the record whose values are to be copied or moved to headers.")
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Header names, in the same order as the field names listed in the fields configuration property.")
+            .define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+                    ConfigDef.ValidString.in("move", "copy"), ConfigDef.Importance.HIGH,
+                    "Either <code>move</code> if the fields are to be moved to the headers (removed from the key/value), " +
+                            "or <code>copy</code> if the fields are to be copied to the headers (retained in the key/value).");

Review comment:
       Might make sense to refer to the `MOVE.name` and `COPY.name` fields declared in the `Operation` enum below instead of using the literal `"move"` and `"copy"` strings in this section.

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.List;
+import java.util.Map;
+
+public class DropHeaders<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String OVERVIEW_DOC =
+            "Removes one or more headers from each record.";
+
+    public static final String HEADERS_FIELD = "header.names";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "The name of the headers to be removed.");
+
+    private List<String> headers;
+
+    @Override
+    public R apply(R record) {
+        Headers updatedHeaders = record.headers().duplicate();

Review comment:
       Same question here RE: duplication

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.Map;
+
+public class InsertHeader<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String OVERVIEW_DOC =
+            "Add a header to each record.";
+
+    public static final String HEADER_FIELD = "header";
+    public static final String VALUE_LITERAL_FIELD = "value.literal";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(HEADER_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
+                    "The name of the header.")
+            .define(VALUE_LITERAL_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
+                    "The literal value that is to be set as the header value on all records.");
+
+    private String header;
+
+    private String literalValue;

Review comment:
       I think this is supposed to be parsed using the `Values` class, which would allow users to specify integral, floating point, boolean, and even structured and nested types here that would then be correctly picked up by the framework and used in the resulting header.

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String FIELDS_FIELD = "fields";
+    public static final String HEADERS_FIELD = "headers";
+    public static final String OPERATION_FIELD = "operation";
+
+    public static final String OVERVIEW_DOC =
+            "Moves or copies fields in the key/value of a record into that record's headers. " +
+                    "Corresponding elements of <code>" + FIELDS_FIELD + "</code> and " +
+                    "<code>" + HEADERS_FIELD + "</code> together identify a field and the header it should be " +
+                    "moved or copied to. " +
+                    "Use the concrete transformation type designed for the record " +
+                    "key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELDS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Field names in the record whose values are to be copied or moved to headers.")
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Header names, in the same order as the field names listed in the fields configuration property.")
+            .define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+                    ConfigDef.ValidString.in("move", "copy"), ConfigDef.Importance.HIGH,
+                    "Either <code>move</code> if the fields are to be moved to the headers (removed from the key/value), " +
+                            "or <code>copy</code> if the fields are to be copied to the headers (retained in the key/value).");
+
+    enum Operation {
+        MOVE("move"),
+        COPY("copy");
+
+        private final String name;
+
+        Operation(String name) {
+            this.name = name;
+        }
+
+        static Operation fromName(String name) {
+            switch (name) {
+                case "move":
+                    return MOVE;
+                case "copy":
+                    return COPY;
+                default:
+                    throw new IllegalArgumentException();
+            }
+        }
+
+        public String toString() {
+            return name;
+        }
+    }
+
+    private List<String> fields;
+
+    private List<String> headers;
+
+    private Operation operation;
+
+    @Override
+    public R apply(R record) {
+        Object operatingValue = operatingValue(record);
+        Schema operatingSchema = operatingSchema(record);
+
+        if (operatingSchema == null) {
+            return applySchemaless(record, operatingValue);
+        } else {
+            return applyWithSchema(record, operatingValue, operatingSchema);
+        }
+    }
+
+    private R applyWithSchema(R record, Object operatingValue, Schema operatingSchema) {
+        Headers updatedHeaders = record.headers().duplicate();
+        Struct value = Requirements.requireStruct(operatingValue, "header " + operation);
+        final Schema updatedSchema;
+        if (operation == Operation.MOVE) {
+            updatedSchema = moveSchema(operatingSchema);
+        } else {
+            updatedSchema = operatingSchema;
+        }
+        final Struct updatedValue = new Struct(updatedSchema);
+        for (Field field : updatedSchema.fields()) {
+            updatedValue.put(field, value.get(field.name()));
+        }
+        for (int i = 0; i < fields.size(); i++) {
+            String fieldName = fields.get(i);
+            String headerName = headers.get(i);
+            Object fieldValue = value.get(fieldName);

Review comment:
       Worth noting that if there's no field with this name in the schema for the record, this will throw an exception and (unless it's configured with an error tolerance to ignore such issues), fail the task.
   
   This isn't unreasonable behavior, especially given that that case isn't covered in the KIP, but it's different from what happens right now in the `applySchemaless` method, which is to silently add a null header instead.
   
   It's probably best to ensure the same behavior happens in either case. I don't have a strong preference either way but I'm tentatively leaning towards silently adding `null` headers since it'll make working with heterogeneous topics easier.

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.List;
+import java.util.Map;
+
+public class DropHeaders<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String OVERVIEW_DOC =
+            "Removes one or more headers from each record.";
+
+    public static final String HEADERS_FIELD = "header.names";

Review comment:
       Looks like this has two different names in the KIP; it's referenced as `headers` in the textual description but as `header.names` in the sample config.
   
   I think `headers` makes more sense here given that it would align with the properties defined in the `InsertHeader` and `HeaderFrom` transformations.

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String FIELDS_FIELD = "fields";
+    public static final String HEADERS_FIELD = "headers";
+    public static final String OPERATION_FIELD = "operation";
+
+    public static final String OVERVIEW_DOC =
+            "Moves or copies fields in the key/value of a record into that record's headers. " +
+                    "Corresponding elements of <code>" + FIELDS_FIELD + "</code> and " +
+                    "<code>" + HEADERS_FIELD + "</code> together identify a field and the header it should be " +
+                    "moved or copied to. " +
+                    "Use the concrete transformation type designed for the record " +
+                    "key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELDS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Field names in the record whose values are to be copied or moved to headers.")
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Header names, in the same order as the field names listed in the fields configuration property.")
+            .define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+                    ConfigDef.ValidString.in("move", "copy"), ConfigDef.Importance.HIGH,
+                    "Either <code>move</code> if the fields are to be moved to the headers (removed from the key/value), " +
+                            "or <code>copy</code> if the fields are to be copied to the headers (retained in the key/value).");
+
+    enum Operation {
+        MOVE("move"),
+        COPY("copy");
+
+        private final String name;
+
+        Operation(String name) {
+            this.name = name;
+        }
+
+        static Operation fromName(String name) {
+            switch (name) {
+                case "move":
+                    return MOVE;
+                case "copy":
+                    return COPY;
+                default:
+                    throw new IllegalArgumentException();
+            }
+        }
+
+        public String toString() {
+            return name;
+        }
+    }
+
+    private List<String> fields;
+
+    private List<String> headers;
+
+    private Operation operation;
+
+    @Override
+    public R apply(R record) {
+        Object operatingValue = operatingValue(record);
+        Schema operatingSchema = operatingSchema(record);
+
+        if (operatingSchema == null) {
+            return applySchemaless(record, operatingValue);
+        } else {
+            return applyWithSchema(record, operatingValue, operatingSchema);
+        }
+    }
+
+    private R applyWithSchema(R record, Object operatingValue, Schema operatingSchema) {
+        Headers updatedHeaders = record.headers().duplicate();
+        Struct value = Requirements.requireStruct(operatingValue, "header " + operation);
+        final Schema updatedSchema;
+        if (operation == Operation.MOVE) {
+            updatedSchema = moveSchema(operatingSchema);
+        } else {
+            updatedSchema = operatingSchema;
+        }
+        final Struct updatedValue = new Struct(updatedSchema);
+        for (Field field : updatedSchema.fields()) {
+            updatedValue.put(field, value.get(field.name()));
+        }
+        for (int i = 0; i < fields.size(); i++) {
+            String fieldName = fields.get(i);
+            String headerName = headers.get(i);
+            Object fieldValue = value.get(fieldName);
+            Schema fieldSchema = operatingSchema.field(fieldName).schema();
+            updatedHeaders.add(headerName, fieldValue, fieldSchema);
+        }
+        return newRecord(record, updatedSchema, updatedValue, updatedHeaders);
+    }
+
+    private Schema moveSchema(Schema operatingSchema) {
+        final Schema updatedSchema;
+        final SchemaBuilder builder = SchemaUtil.copySchemaBasics(operatingSchema, SchemaBuilder.struct());
+        for (Field field : operatingSchema.fields()) {
+            if (!fields.contains(field.name())) {
+                builder.field(field.name(), field.schema());
+            }
+        }
+        updatedSchema = builder.build();
+        return updatedSchema;
+    }
+
+    private R applySchemaless(R record, Object operatingValue) {
+        Headers updatedHeaders = record.headers().duplicate();
+        Map<String, Object> value = Requirements.requireMap(operatingValue, "header " + operation);
+        Map<String, Object> updatedValue = new HashMap<>(value);
+        for (int i = 0; i < fields.size(); i++) {
+            String fieldName = fields.get(i);
+            Object fieldValue = value.get(fieldName);
+            String headerName = headers.get(i);
+            if (operation == Operation.MOVE) {
+                updatedValue.remove(fieldName);
+            }
+            updatedHeaders.add(headerName, fieldValue, null);
+        }
+        return newRecord(record, null, updatedValue, updatedHeaders);
+    }
+
+    protected abstract Object operatingValue(R record);
+    protected abstract Schema operatingSchema(R record);
+    protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue, Iterable<Header> updatedHeaders);
+
+    public static class Key<R extends ConnectRecord<R>> extends HeaderFrom<R> {
+
+        @Override
+        public Object operatingValue(R record) {
+            return record.key();
+        }
+
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.keySchema();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema, Object updatedValue, Iterable<Header> updatedHeaders) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue,
+                    record.valueSchema(), record.value(), record.timestamp(), updatedHeaders);
+        }
+    }
+
+    public static class Value<R extends ConnectRecord<R>> extends HeaderFrom<R> {
+
+        @Override
+        public Object operatingValue(R record) {
+            return record.value();
+        }
+
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.valueSchema();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema, Object updatedValue, Iterable<Header> updatedHeaders) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
+                    updatedSchema, updatedValue, record.timestamp(), updatedHeaders);
+        }
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> props) {
+        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
+        fields = config.getList(FIELDS_FIELD);
+        headers = config.getList(HEADERS_FIELD);
+        if (headers.size() != fields.size()) {
+            throw new ConfigException("'fields' config must have the same number of elements as 'headers' config.");

Review comment:
       Probably want to refer to `FIELDS_FIELD` and `HEADERS_FIELD` here instead of the literal `"fields"` and `"headers"` strings.

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String FIELDS_FIELD = "fields";
+    public static final String HEADERS_FIELD = "headers";
+    public static final String OPERATION_FIELD = "operation";
+
+    public static final String OVERVIEW_DOC =
+            "Moves or copies fields in the key/value of a record into that record's headers. " +
+                    "Corresponding elements of <code>" + FIELDS_FIELD + "</code> and " +
+                    "<code>" + HEADERS_FIELD + "</code> together identify a field and the header it should be " +
+                    "moved or copied to. " +
+                    "Use the concrete transformation type designed for the record " +
+                    "key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELDS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Field names in the record whose values are to be copied or moved to headers.")
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Header names, in the same order as the field names listed in the fields configuration property.")
+            .define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+                    ConfigDef.ValidString.in("move", "copy"), ConfigDef.Importance.HIGH,
+                    "Either <code>move</code> if the fields are to be moved to the headers (removed from the key/value), " +
+                            "or <code>copy</code> if the fields are to be copied to the headers (retained in the key/value).");
+
+    enum Operation {
+        MOVE("move"),
+        COPY("copy");
+
+        private final String name;
+
+        Operation(String name) {
+            this.name = name;
+        }
+
+        static Operation fromName(String name) {
+            switch (name) {
+                case "move":
+                    return MOVE;
+                case "copy":
+                    return COPY;
+                default:
+                    throw new IllegalArgumentException();
+            }
+        }
+
+        public String toString() {
+            return name;
+        }
+    }
+
+    private List<String> fields;
+
+    private List<String> headers;
+
+    private Operation operation;
+
+    @Override
+    public R apply(R record) {
+        Object operatingValue = operatingValue(record);
+        Schema operatingSchema = operatingSchema(record);
+
+        if (operatingSchema == null) {
+            return applySchemaless(record, operatingValue);
+        } else {
+            return applyWithSchema(record, operatingValue, operatingSchema);
+        }
+    }
+
+    private R applyWithSchema(R record, Object operatingValue, Schema operatingSchema) {
+        Headers updatedHeaders = record.headers().duplicate();
+        Struct value = Requirements.requireStruct(operatingValue, "header " + operation);
+        final Schema updatedSchema;
+        if (operation == Operation.MOVE) {
+            updatedSchema = moveSchema(operatingSchema);
+        } else {
+            updatedSchema = operatingSchema;
+        }
+        final Struct updatedValue = new Struct(updatedSchema);
+        for (Field field : updatedSchema.fields()) {
+            updatedValue.put(field, value.get(field.name()));
+        }
+        for (int i = 0; i < fields.size(); i++) {
+            String fieldName = fields.get(i);
+            String headerName = headers.get(i);
+            Object fieldValue = value.get(fieldName);
+            Schema fieldSchema = operatingSchema.field(fieldName).schema();
+            updatedHeaders.add(headerName, fieldValue, fieldSchema);
+        }
+        return newRecord(record, updatedSchema, updatedValue, updatedHeaders);
+    }
+
+    private Schema moveSchema(Schema operatingSchema) {
+        final Schema updatedSchema;
+        final SchemaBuilder builder = SchemaUtil.copySchemaBasics(operatingSchema, SchemaBuilder.struct());
+        for (Field field : operatingSchema.fields()) {
+            if (!fields.contains(field.name())) {
+                builder.field(field.name(), field.schema());
+            }
+        }
+        updatedSchema = builder.build();
+        return updatedSchema;
+    }

Review comment:
       This looks fairly expensive to perform for every record. Do you think it might make sense to perform some caching, similarly to what's done in the [ReplaceField](https://github.com/apache/kafka/blob/d61dc0c1832935ae680388dcb8c12d1250dece33/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java#L169-L173) transform?

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String FIELDS_FIELD = "fields";
+    public static final String HEADERS_FIELD = "headers";
+    public static final String OPERATION_FIELD = "operation";
+
+    public static final String OVERVIEW_DOC =
+            "Moves or copies fields in the key/value of a record into that record's headers. " +
+                    "Corresponding elements of <code>" + FIELDS_FIELD + "</code> and " +
+                    "<code>" + HEADERS_FIELD + "</code> together identify a field and the header it should be " +
+                    "moved or copied to. " +
+                    "Use the concrete transformation type designed for the record " +
+                    "key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELDS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Field names in the record whose values are to be copied or moved to headers.")
+            .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
+                    "Header names, in the same order as the field names listed in the fields configuration property.")
+            .define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+                    ConfigDef.ValidString.in("move", "copy"), ConfigDef.Importance.HIGH,
+                    "Either <code>move</code> if the fields are to be moved to the headers (removed from the key/value), " +
+                            "or <code>copy</code> if the fields are to be copied to the headers (retained in the key/value).");
+
+    enum Operation {
+        MOVE("move"),
+        COPY("copy");
+
+        private final String name;
+
+        Operation(String name) {
+            this.name = name;
+        }
+
+        static Operation fromName(String name) {
+            switch (name) {
+                case "move":
+                    return MOVE;
+                case "copy":
+                    return COPY;
+                default:
+                    throw new IllegalArgumentException();
+            }
+        }
+
+        public String toString() {
+            return name;
+        }
+    }
+
+    private List<String> fields;
+
+    private List<String> headers;
+
+    private Operation operation;
+
+    @Override
+    public R apply(R record) {
+        Object operatingValue = operatingValue(record);
+        Schema operatingSchema = operatingSchema(record);
+
+        if (operatingSchema == null) {
+            return applySchemaless(record, operatingValue);
+        } else {
+            return applyWithSchema(record, operatingValue, operatingSchema);
+        }
+    }
+
+    private R applyWithSchema(R record, Object operatingValue, Schema operatingSchema) {
+        Headers updatedHeaders = record.headers().duplicate();
+        Struct value = Requirements.requireStruct(operatingValue, "header " + operation);
+        final Schema updatedSchema;
+        if (operation == Operation.MOVE) {
+            updatedSchema = moveSchema(operatingSchema);
+        } else {
+            updatedSchema = operatingSchema;
+        }
+        final Struct updatedValue = new Struct(updatedSchema);
+        for (Field field : updatedSchema.fields()) {
+            updatedValue.put(field, value.get(field.name()));
+        }

Review comment:
       Does this mean we'd be iterating over every field of every record? Not very performant 🙁 
   
   I think we can improve the efficiency here for the `COPY` case, if not the `MOVE` case, by reusing the existing value and not creating a new one.
   
   In the `MOVE` case though, it might be the best we have to settle for until/unless there's a richer API added for mutating Connect records and their keys, values, schemas, etc.

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.Map;
+
+public class InsertHeader<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String OVERVIEW_DOC =
+            "Add a header to each record.";
+
+    public static final String HEADER_FIELD = "header";
+    public static final String VALUE_LITERAL_FIELD = "value.literal";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(HEADER_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
+                    "The name of the header.")
+            .define(VALUE_LITERAL_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
+                    "The literal value that is to be set as the header value on all records.");
+
+    private String header;
+
+    private String literalValue;
+
+    @Override
+    public R apply(R record) {
+        Headers updatedHeaders = record.headers().duplicate();
+        updatedHeaders.add(header, literalValue, Schema.STRING_SCHEMA);

Review comment:
       If we use the `Values` class to parse the value literal, we can use the schema that it provides instead of hardcoding this to use a string schema.

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.Map;
+
+public class InsertHeader<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final String OVERVIEW_DOC =
+            "Add a header to each record.";
+
+    public static final String HEADER_FIELD = "header";
+    public static final String VALUE_LITERAL_FIELD = "value.literal";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(HEADER_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
+                    "The name of the header.")
+            .define(VALUE_LITERAL_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
+                    "The literal value that is to be set as the header value on all records.");
+
+    private String header;
+
+    private String literalValue;
+
+    @Override
+    public R apply(R record) {
+        Headers updatedHeaders = record.headers().duplicate();

Review comment:
       Same question here RE: duplication




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org