You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2017/06/14 22:24:36 UTC

kafka git commit: KAFKA-5448: Change TimestampConverter configuration name to avoid conflicting with reserved 'type' configuration used by all Transformations

Repository: kafka
Updated Branches:
  refs/heads/trunk 3c2329ad0 -> b760615f3


KAFKA-5448: Change TimestampConverter configuration name to avoid conflicting with reserved 'type' configuration used by all Transformations

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: David Tucker, Gwen Shapira

Closes #3342 from ewencp/kafka-5448-change-timestamp-converter-config-name


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b760615f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b760615f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b760615f

Branch: refs/heads/trunk
Commit: b760615f3d58ed1fd4e17bb3ec647b26135cf458
Parents: 3c2329a
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Wed Jun 14 15:24:32 2017 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Wed Jun 14 15:24:32 2017 -0700

----------------------------------------------------------------------
 .../runtime/TransformationConfigTest.java       | 211 +++++++++++++++++++
 .../connect/transforms/TimestampConverter.java  |   6 +-
 .../transforms/TimestampConverterTest.java      |  48 ++---
 3 files changed, 238 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b760615f/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java
new file mode 100644
index 0000000..5cd0bcb
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.tools.MockConnector;
+import org.apache.kafka.connect.transforms.Cast;
+import org.apache.kafka.connect.transforms.ExtractField;
+import org.apache.kafka.connect.transforms.Flatten;
+import org.apache.kafka.connect.transforms.HoistField;
+import org.apache.kafka.connect.transforms.InsertField;
+import org.apache.kafka.connect.transforms.MaskField;
+import org.apache.kafka.connect.transforms.RegexRouter;
+import org.apache.kafka.connect.transforms.ReplaceField;
+import org.apache.kafka.connect.transforms.SetSchemaMetadata;
+import org.apache.kafka.connect.transforms.TimestampConverter;
+import org.apache.kafka.connect.transforms.TimestampRouter;
+import org.apache.kafka.connect.transforms.ValueToKey;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+/**
+ * Tests that transformations' configs can be composed with ConnectorConfig during its construction, ensuring no
+ * conflicting fields or other issues.
+ *
+ * This test appears here simply because it requires both connect-runtime and connect-transforms and connect-runtime
+ * already depends on connect-transforms.
+ */
+public class TransformationConfigTest {
+
+    @Test
+    public void testEmbeddedConfigCast() {
+        // Validate that we can construct a Connector config containing the extended config for the transform
+        HashMap<String, String> connProps = new HashMap<>();
+        connProps.put("name", "foo");
+        connProps.put("connector.class", MockConnector.class.getName());
+        connProps.put("transforms", "example");
+        connProps.put("transforms.example.type", Cast.Value.class.getName());
+        connProps.put("transforms.example.spec", "int8");
+
+        Plugins plugins = null; // Safe when we're only constructing the config
+        new ConnectorConfig(plugins, connProps);
+    }
+
+    @Test
+    public void testEmbeddedConfigExtractField() {
+        // Validate that we can construct a Connector config containing the extended config for the transform
+        HashMap<String, String> connProps = new HashMap<>();
+        connProps.put("name", "foo");
+        connProps.put("connector.class", MockConnector.class.getName());
+        connProps.put("transforms", "example");
+        connProps.put("transforms.example.type", ExtractField.Value.class.getName());
+        connProps.put("transforms.example.field", "field");
+
+        Plugins plugins = null; // Safe when we're only constructing the config
+        new ConnectorConfig(plugins, connProps);
+    }
+
+    @Test
+    public void testEmbeddedConfigFlatten() {
+        // Validate that we can construct a Connector config containing the extended config for the transform
+        HashMap<String, String> connProps = new HashMap<>();
+        connProps.put("name", "foo");
+        connProps.put("connector.class", MockConnector.class.getName());
+        connProps.put("transforms", "example");
+        connProps.put("transforms.example.type", Flatten.Value.class.getName());
+
+        Plugins plugins = null; // Safe when we're only constructing the config
+        new ConnectorConfig(plugins, connProps);
+    }
+
+    @Test
+    public void testEmbeddedConfigHoistField() {
+        // Validate that we can construct a Connector config containing the extended config for the transform
+        HashMap<String, String> connProps = new HashMap<>();
+        connProps.put("name", "foo");
+        connProps.put("connector.class", MockConnector.class.getName());
+        connProps.put("transforms", "example");
+        connProps.put("transforms.example.type", HoistField.Value.class.getName());
+        connProps.put("transforms.example.field", "field");
+
+        Plugins plugins = null; // Safe when we're only constructing the config
+        new ConnectorConfig(plugins, connProps);
+    }
+
+    @Test
+    public void testEmbeddedConfigInsertField() {
+        // Validate that we can construct a Connector config containing the extended config for the transform
+        HashMap<String, String> connProps = new HashMap<>();
+        connProps.put("name", "foo");
+        connProps.put("connector.class", MockConnector.class.getName());
+        connProps.put("transforms", "example");
+        connProps.put("transforms.example.type", InsertField.Value.class.getName());
+
+        Plugins plugins = null; // Safe when we're only constructing the config
+        new ConnectorConfig(plugins, connProps);
+    }
+
+    @Test
+    public void testEmbeddedConfigMaskField() {
+        // Validate that we can construct a Connector config containing the extended config for the transform
+        HashMap<String, String> connProps = new HashMap<>();
+        connProps.put("name", "foo");
+        connProps.put("connector.class", MockConnector.class.getName());
+        connProps.put("transforms", "example");
+        connProps.put("transforms.example.type", MaskField.Value.class.getName());
+        connProps.put("transforms.example.fields", "field");
+
+
+        Plugins plugins = null; // Safe when we're only constructing the config
+        new ConnectorConfig(plugins, connProps);
+    }
+
+    @Test
+    public void testEmbeddedConfigRegexRouter() {
+        // Validate that we can construct a Connector config containing the extended config for the transform
+        HashMap<String, String> connProps = new HashMap<>();
+        connProps.put("name", "foo");
+        connProps.put("connector.class", MockConnector.class.getName());
+        connProps.put("transforms", "example");
+        connProps.put("transforms.example.type", RegexRouter.class.getName());
+        connProps.put("transforms.example.regex", "(.*)");
+        connProps.put("transforms.example.replacement", "prefix-$1");
+
+        Plugins plugins = null; // Safe when we're only constructing the config
+        new ConnectorConfig(plugins, connProps);
+    }
+
+    @Test
+    public void testEmbeddedConfigReplaceField() {
+        // Validate that we can construct a Connector config containing the extended config for the transform
+        HashMap<String, String> connProps = new HashMap<>();
+        connProps.put("name", "foo");
+        connProps.put("connector.class", MockConnector.class.getName());
+        connProps.put("transforms", "example");
+        connProps.put("transforms.example.type", ReplaceField.Value.class.getName());
+
+        Plugins plugins = null; // Safe when we're only constructing the config
+        new ConnectorConfig(plugins, connProps);
+    }
+
+    @Test
+    public void testEmbeddedConfigSetSchemaMetadata() {
+        // Validate that we can construct a Connector config containing the extended config for the transform
+        HashMap<String, String> connProps = new HashMap<>();
+        connProps.put("name", "foo");
+        connProps.put("connector.class", MockConnector.class.getName());
+        connProps.put("transforms", "example");
+        connProps.put("transforms.example.type", SetSchemaMetadata.Value.class.getName());
+
+        Plugins plugins = null; // Safe when we're only constructing the config
+        new ConnectorConfig(plugins, connProps);
+    }
+
+    @Test
+    public void testEmbeddedConfigTimestampConverter() {
+        // Validate that we can construct a Connector config containing the extended config for the transform
+        HashMap<String, String> connProps = new HashMap<>();
+        connProps.put("name", "foo");
+        connProps.put("connector.class", MockConnector.class.getName());
+        connProps.put("transforms", "example");
+        connProps.put("transforms.example.type", TimestampConverter.Value.class.getName());
+        connProps.put("transforms.example.target.type", "unix");
+
+        Plugins plugins = null; // Safe when we're only constructing the config
+        new ConnectorConfig(plugins, connProps);
+    }
+
+    @Test
+    public void testEmbeddedConfigTimestampRouter() {
+        // Validate that we can construct a Connector config containing the extended config for the transform
+        HashMap<String, String> connProps = new HashMap<>();
+        connProps.put("name", "foo");
+        connProps.put("connector.class", MockConnector.class.getName());
+        connProps.put("transforms", "example");
+        connProps.put("transforms.example.type", TimestampRouter.class.getName());
+
+        Plugins plugins = null; // Safe when we're only constructing the config
+        new ConnectorConfig(plugins, connProps);
+    }
+
+    @Test
+    public void testEmbeddedConfigValueToKey() {
+        // Validate that we can construct a Connector config containing the extended config for the transform
+        HashMap<String, String> connProps = new HashMap<>();
+        connProps.put("name", "foo");
+        connProps.put("connector.class", MockConnector.class.getName());
+        connProps.put("transforms", "example");
+        connProps.put("transforms.example.type", ValueToKey.class.getName());
+        connProps.put("transforms.example.fields", "field");
+
+        Plugins plugins = null; // Safe when we're only constructing the config
+        new ConnectorConfig(plugins, connProps);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b760615f/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
index ce7d002..8557441 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
@@ -60,7 +60,7 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
     public static final String FIELD_CONFIG = "field";
     private static final String FIELD_DEFAULT = "";
 
-    public static final String TYPE_CONFIG = "type";
+    public static final String TARGET_TYPE_CONFIG = "target.type";
 
     public static final String FORMAT_CONFIG = "format";
     private static final String FORMAT_DEFAULT = "";
@@ -68,7 +68,7 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
             .define(FIELD_CONFIG, ConfigDef.Type.STRING, FIELD_DEFAULT, ConfigDef.Importance.HIGH,
                     "The field containing the timestamp, or empty if the entire value is a timestamp")
-            .define(TYPE_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
+            .define(TARGET_TYPE_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
                     "The desired timestamp representation: string, unix, Date, Time, or Timestamp")
             .define(FORMAT_CONFIG, ConfigDef.Type.STRING, FORMAT_DEFAULT, ConfigDef.Importance.MEDIUM,
                     "A SimpleDateFormat-compatible format for the timestamp. Used to generate the output when type=string "
@@ -243,7 +243,7 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
     public void configure(Map<String, ?> configs) {
         final SimpleConfig simpleConfig = new SimpleConfig(CONFIG_DEF, configs);
         final String field = simpleConfig.getString(FIELD_CONFIG);
-        final String type = simpleConfig.getString(TYPE_CONFIG);
+        final String type = simpleConfig.getString(TARGET_TYPE_CONFIG);
         String formatPattern = simpleConfig.getString(FORMAT_CONFIG);
         schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b760615f/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
index 1b93874..12beef8 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
@@ -82,20 +82,20 @@ public class TimestampConverterTest {
     @Test(expected = ConfigException.class)
     public void testConfigInvalidTargetType() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "invalid"));
+        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "invalid"));
     }
 
     @Test(expected = ConfigException.class)
     public void testConfigMissingFormat() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "string"));
+        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "string"));
     }
 
     @Test(expected = ConfigException.class)
     public void testConfigInvalidFormat() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
         Map<String, String> config = new HashMap<>();
-        config.put(TimestampConverter.TYPE_CONFIG, "string");
+        config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string");
         config.put(TimestampConverter.FORMAT_CONFIG, "bad-format");
         xform.configure(config);
     }
@@ -106,7 +106,7 @@ public class TimestampConverterTest {
     @Test
     public void testSchemalessIdentity() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
         SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
 
         assertNull(transformed.valueSchema());
@@ -116,7 +116,7 @@ public class TimestampConverterTest {
     @Test
     public void testSchemalessTimestampToDate() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Date"));
+        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date"));
         SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
 
         assertNull(transformed.valueSchema());
@@ -126,7 +126,7 @@ public class TimestampConverterTest {
     @Test
     public void testSchemalessTimestampToTime() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Time"));
+        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time"));
         SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
 
         assertNull(transformed.valueSchema());
@@ -136,7 +136,7 @@ public class TimestampConverterTest {
     @Test
     public void testSchemalessTimestampToUnix() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "unix"));
+        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix"));
         SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
 
         assertNull(transformed.valueSchema());
@@ -147,7 +147,7 @@ public class TimestampConverterTest {
     public void testSchemalessTimestampToString() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
         Map<String, String> config = new HashMap<>();
-        config.put(TimestampConverter.TYPE_CONFIG, "string");
+        config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string");
         config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
         xform.configure(config);
         SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
@@ -162,7 +162,7 @@ public class TimestampConverterTest {
     @Test
     public void testSchemalessDateToTimestamp() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
         SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE.getTime()));
 
         assertNull(transformed.valueSchema());
@@ -173,7 +173,7 @@ public class TimestampConverterTest {
     @Test
     public void testSchemalessTimeToTimestamp() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
         SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, TIME.getTime()));
 
         assertNull(transformed.valueSchema());
@@ -184,7 +184,7 @@ public class TimestampConverterTest {
     @Test
     public void testSchemalessUnixToTimestamp() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
         SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME_UNIX));
 
         assertNull(transformed.valueSchema());
@@ -195,7 +195,7 @@ public class TimestampConverterTest {
     public void testSchemalessStringToTimestamp() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
         Map<String, String> config = new HashMap<>();
-        config.put(TimestampConverter.TYPE_CONFIG, "Timestamp");
+        config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
         config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
         xform.configure(config);
         SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME_STRING));
@@ -210,7 +210,7 @@ public class TimestampConverterTest {
     @Test
     public void testWithSchemaIdentity() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
         SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
 
         assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
@@ -220,7 +220,7 @@ public class TimestampConverterTest {
     @Test
     public void testWithSchemaTimestampToDate() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Date"));
+        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date"));
         SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
 
         assertEquals(Date.SCHEMA, transformed.valueSchema());
@@ -230,7 +230,7 @@ public class TimestampConverterTest {
     @Test
     public void testWithSchemaTimestampToTime() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Time"));
+        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time"));
         SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
 
         assertEquals(Time.SCHEMA, transformed.valueSchema());
@@ -240,7 +240,7 @@ public class TimestampConverterTest {
     @Test
     public void testWithSchemaTimestampToUnix() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "unix"));
+        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix"));
         SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
 
         assertEquals(Schema.INT64_SCHEMA, transformed.valueSchema());
@@ -251,7 +251,7 @@ public class TimestampConverterTest {
     public void testWithSchemaTimestampToString() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
         Map<String, String> config = new HashMap<>();
-        config.put(TimestampConverter.TYPE_CONFIG, "string");
+        config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string");
         config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
         xform.configure(config);
         SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
@@ -266,7 +266,7 @@ public class TimestampConverterTest {
     @Test
     public void testWithSchemaDateToTimestamp() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
         SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Date.SCHEMA, DATE.getTime()));
 
         assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
@@ -277,7 +277,7 @@ public class TimestampConverterTest {
     @Test
     public void testWithSchemaTimeToTimestamp() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
         SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Time.SCHEMA, TIME.getTime()));
 
         assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
@@ -288,7 +288,7 @@ public class TimestampConverterTest {
     @Test
     public void testWithSchemaUnixToTimestamp() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
         SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Schema.INT64_SCHEMA, DATE_PLUS_TIME_UNIX));
 
         assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
@@ -299,7 +299,7 @@ public class TimestampConverterTest {
     public void testWithSchemaStringToTimestamp() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
         Map<String, String> config = new HashMap<>();
-        config.put(TimestampConverter.TYPE_CONFIG, "Timestamp");
+        config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
         config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
         xform.configure(config);
         SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Schema.STRING_SCHEMA, DATE_PLUS_TIME_STRING));
@@ -315,7 +315,7 @@ public class TimestampConverterTest {
     public void testSchemalessFieldConversion() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
         Map<String, String> config = new HashMap<>();
-        config.put(TimestampConverter.TYPE_CONFIG, "Date");
+        config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Date");
         config.put(TimestampConverter.FIELD_CONFIG, "ts");
         xform.configure(config);
 
@@ -330,7 +330,7 @@ public class TimestampConverterTest {
     public void testWithSchemaFieldConversion() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
         Map<String, String> config = new HashMap<>();
-        config.put(TimestampConverter.TYPE_CONFIG, "Timestamp");
+        config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
         config.put(TimestampConverter.FIELD_CONFIG, "ts");
         xform.configure(config);
 
@@ -360,7 +360,7 @@ public class TimestampConverterTest {
     @Test
     public void testKey() {
         TimestampConverter<SourceRecord> xform = new TimestampConverter.Key<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
         SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime(), null, null));
 
         assertNull(transformed.keySchema());