You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2017/06/14 22:24:36 UTC
kafka git commit: KAFKA-5448: Change TimestampConverter configuration
name to avoid conflicting with reserved 'type' configuration used by all
Transformations
Repository: kafka
Updated Branches:
refs/heads/trunk 3c2329ad0 -> b760615f3
KAFKA-5448: Change TimestampConverter configuration name to avoid conflicting with reserved 'type' configuration used by all Transformations
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Reviewers: David Tucker, Gwen Shapira
Closes #3342 from ewencp/kafka-5448-change-timestamp-converter-config-name
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b760615f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b760615f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b760615f
Branch: refs/heads/trunk
Commit: b760615f3d58ed1fd4e17bb3ec647b26135cf458
Parents: 3c2329a
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Wed Jun 14 15:24:32 2017 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Wed Jun 14 15:24:32 2017 -0700
----------------------------------------------------------------------
.../runtime/TransformationConfigTest.java | 211 +++++++++++++++++++
.../connect/transforms/TimestampConverter.java | 6 +-
.../transforms/TimestampConverterTest.java | 48 ++---
3 files changed, 238 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b760615f/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java
new file mode 100644
index 0000000..5cd0bcb
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.tools.MockConnector;
+import org.apache.kafka.connect.transforms.Cast;
+import org.apache.kafka.connect.transforms.ExtractField;
+import org.apache.kafka.connect.transforms.Flatten;
+import org.apache.kafka.connect.transforms.HoistField;
+import org.apache.kafka.connect.transforms.InsertField;
+import org.apache.kafka.connect.transforms.MaskField;
+import org.apache.kafka.connect.transforms.RegexRouter;
+import org.apache.kafka.connect.transforms.ReplaceField;
+import org.apache.kafka.connect.transforms.SetSchemaMetadata;
+import org.apache.kafka.connect.transforms.TimestampConverter;
+import org.apache.kafka.connect.transforms.TimestampRouter;
+import org.apache.kafka.connect.transforms.ValueToKey;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+/**
+ * Tests that transformations' configs can be composed with ConnectorConfig during its construction, ensuring no
+ * conflicting fields or other issues.
+ *
+ * This test appears here simply because it requires both connect-runtime and connect-transforms and connect-runtime
+ * already depends on connect-transforms.
+ */
+public class TransformationConfigTest {
+
+ @Test
+ public void testEmbeddedConfigCast() {
+ // Validate that we can construct a Connector config containing the extended config for the transform
+ HashMap<String, String> connProps = new HashMap<>();
+ connProps.put("name", "foo");
+ connProps.put("connector.class", MockConnector.class.getName());
+ connProps.put("transforms", "example");
+ connProps.put("transforms.example.type", Cast.Value.class.getName());
+ connProps.put("transforms.example.spec", "int8");
+
+ Plugins plugins = null; // Safe when we're only constructing the config
+ new ConnectorConfig(plugins, connProps);
+ }
+
+ @Test
+ public void testEmbeddedConfigExtractField() {
+ // Validate that we can construct a Connector config containing the extended config for the transform
+ HashMap<String, String> connProps = new HashMap<>();
+ connProps.put("name", "foo");
+ connProps.put("connector.class", MockConnector.class.getName());
+ connProps.put("transforms", "example");
+ connProps.put("transforms.example.type", ExtractField.Value.class.getName());
+ connProps.put("transforms.example.field", "field");
+
+ Plugins plugins = null; // Safe when we're only constructing the config
+ new ConnectorConfig(plugins, connProps);
+ }
+
+ @Test
+ public void testEmbeddedConfigFlatten() {
+ // Validate that we can construct a Connector config containing the extended config for the transform
+ HashMap<String, String> connProps = new HashMap<>();
+ connProps.put("name", "foo");
+ connProps.put("connector.class", MockConnector.class.getName());
+ connProps.put("transforms", "example");
+ connProps.put("transforms.example.type", Flatten.Value.class.getName());
+
+ Plugins plugins = null; // Safe when we're only constructing the config
+ new ConnectorConfig(plugins, connProps);
+ }
+
+ @Test
+ public void testEmbeddedConfigHoistField() {
+ // Validate that we can construct a Connector config containing the extended config for the transform
+ HashMap<String, String> connProps = new HashMap<>();
+ connProps.put("name", "foo");
+ connProps.put("connector.class", MockConnector.class.getName());
+ connProps.put("transforms", "example");
+ connProps.put("transforms.example.type", HoistField.Value.class.getName());
+ connProps.put("transforms.example.field", "field");
+
+ Plugins plugins = null; // Safe when we're only constructing the config
+ new ConnectorConfig(plugins, connProps);
+ }
+
+ @Test
+ public void testEmbeddedConfigInsertField() {
+ // Validate that we can construct a Connector config containing the extended config for the transform
+ HashMap<String, String> connProps = new HashMap<>();
+ connProps.put("name", "foo");
+ connProps.put("connector.class", MockConnector.class.getName());
+ connProps.put("transforms", "example");
+ connProps.put("transforms.example.type", InsertField.Value.class.getName());
+
+ Plugins plugins = null; // Safe when we're only constructing the config
+ new ConnectorConfig(plugins, connProps);
+ }
+
+ @Test
+ public void testEmbeddedConfigMaskField() {
+ // Validate that we can construct a Connector config containing the extended config for the transform
+ HashMap<String, String> connProps = new HashMap<>();
+ connProps.put("name", "foo");
+ connProps.put("connector.class", MockConnector.class.getName());
+ connProps.put("transforms", "example");
+ connProps.put("transforms.example.type", MaskField.Value.class.getName());
+ connProps.put("transforms.example.fields", "field");
+
+
+ Plugins plugins = null; // Safe when we're only constructing the config
+ new ConnectorConfig(plugins, connProps);
+ }
+
+ @Test
+ public void testEmbeddedConfigRegexRouter() {
+ // Validate that we can construct a Connector config containing the extended config for the transform
+ HashMap<String, String> connProps = new HashMap<>();
+ connProps.put("name", "foo");
+ connProps.put("connector.class", MockConnector.class.getName());
+ connProps.put("transforms", "example");
+ connProps.put("transforms.example.type", RegexRouter.class.getName());
+ connProps.put("transforms.example.regex", "(.*)");
+ connProps.put("transforms.example.replacement", "prefix-$1");
+
+ Plugins plugins = null; // Safe when we're only constructing the config
+ new ConnectorConfig(plugins, connProps);
+ }
+
+ @Test
+ public void testEmbeddedConfigReplaceField() {
+ // Validate that we can construct a Connector config containing the extended config for the transform
+ HashMap<String, String> connProps = new HashMap<>();
+ connProps.put("name", "foo");
+ connProps.put("connector.class", MockConnector.class.getName());
+ connProps.put("transforms", "example");
+ connProps.put("transforms.example.type", ReplaceField.Value.class.getName());
+
+ Plugins plugins = null; // Safe when we're only constructing the config
+ new ConnectorConfig(plugins, connProps);
+ }
+
+ @Test
+ public void testEmbeddedConfigSetSchemaMetadata() {
+ // Validate that we can construct a Connector config containing the extended config for the transform
+ HashMap<String, String> connProps = new HashMap<>();
+ connProps.put("name", "foo");
+ connProps.put("connector.class", MockConnector.class.getName());
+ connProps.put("transforms", "example");
+ connProps.put("transforms.example.type", SetSchemaMetadata.Value.class.getName());
+
+ Plugins plugins = null; // Safe when we're only constructing the config
+ new ConnectorConfig(plugins, connProps);
+ }
+
+ @Test
+ public void testEmbeddedConfigTimestampConverter() {
+ // Validate that we can construct a Connector config containing the extended config for the transform
+ HashMap<String, String> connProps = new HashMap<>();
+ connProps.put("name", "foo");
+ connProps.put("connector.class", MockConnector.class.getName());
+ connProps.put("transforms", "example");
+ connProps.put("transforms.example.type", TimestampConverter.Value.class.getName());
+ connProps.put("transforms.example.target.type", "unix");
+
+ Plugins plugins = null; // Safe when we're only constructing the config
+ new ConnectorConfig(plugins, connProps);
+ }
+
+ @Test
+ public void testEmbeddedConfigTimestampRouter() {
+ // Validate that we can construct a Connector config containing the extended config for the transform
+ HashMap<String, String> connProps = new HashMap<>();
+ connProps.put("name", "foo");
+ connProps.put("connector.class", MockConnector.class.getName());
+ connProps.put("transforms", "example");
+ connProps.put("transforms.example.type", TimestampRouter.class.getName());
+
+ Plugins plugins = null; // Safe when we're only constructing the config
+ new ConnectorConfig(plugins, connProps);
+ }
+
+ @Test
+ public void testEmbeddedConfigValueToKey() {
+ // Validate that we can construct a Connector config containing the extended config for the transform
+ HashMap<String, String> connProps = new HashMap<>();
+ connProps.put("name", "foo");
+ connProps.put("connector.class", MockConnector.class.getName());
+ connProps.put("transforms", "example");
+ connProps.put("transforms.example.type", ValueToKey.class.getName());
+ connProps.put("transforms.example.fields", "field");
+
+ Plugins plugins = null; // Safe when we're only constructing the config
+ new ConnectorConfig(plugins, connProps);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b760615f/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
index ce7d002..8557441 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
@@ -60,7 +60,7 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
public static final String FIELD_CONFIG = "field";
private static final String FIELD_DEFAULT = "";
- public static final String TYPE_CONFIG = "type";
+ public static final String TARGET_TYPE_CONFIG = "target.type";
public static final String FORMAT_CONFIG = "format";
private static final String FORMAT_DEFAULT = "";
@@ -68,7 +68,7 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FIELD_CONFIG, ConfigDef.Type.STRING, FIELD_DEFAULT, ConfigDef.Importance.HIGH,
"The field containing the timestamp, or empty if the entire value is a timestamp")
- .define(TYPE_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
+ .define(TARGET_TYPE_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
"The desired timestamp representation: string, unix, Date, Time, or Timestamp")
.define(FORMAT_CONFIG, ConfigDef.Type.STRING, FORMAT_DEFAULT, ConfigDef.Importance.MEDIUM,
"A SimpleDateFormat-compatible format for the timestamp. Used to generate the output when type=string "
@@ -243,7 +243,7 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
public void configure(Map<String, ?> configs) {
final SimpleConfig simpleConfig = new SimpleConfig(CONFIG_DEF, configs);
final String field = simpleConfig.getString(FIELD_CONFIG);
- final String type = simpleConfig.getString(TYPE_CONFIG);
+ final String type = simpleConfig.getString(TARGET_TYPE_CONFIG);
String formatPattern = simpleConfig.getString(FORMAT_CONFIG);
schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
http://git-wip-us.apache.org/repos/asf/kafka/blob/b760615f/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
index 1b93874..12beef8 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
@@ -82,20 +82,20 @@ public class TimestampConverterTest {
@Test(expected = ConfigException.class)
public void testConfigInvalidTargetType() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "invalid"));
+ xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "invalid"));
}
@Test(expected = ConfigException.class)
public void testConfigMissingFormat() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "string"));
+ xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "string"));
}
@Test(expected = ConfigException.class)
public void testConfigInvalidFormat() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
Map<String, String> config = new HashMap<>();
- config.put(TimestampConverter.TYPE_CONFIG, "string");
+ config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string");
config.put(TimestampConverter.FORMAT_CONFIG, "bad-format");
xform.configure(config);
}
@@ -106,7 +106,7 @@ public class TimestampConverterTest {
@Test
public void testSchemalessIdentity() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+ xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
assertNull(transformed.valueSchema());
@@ -116,7 +116,7 @@ public class TimestampConverterTest {
@Test
public void testSchemalessTimestampToDate() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Date"));
+ xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date"));
SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
assertNull(transformed.valueSchema());
@@ -126,7 +126,7 @@ public class TimestampConverterTest {
@Test
public void testSchemalessTimestampToTime() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Time"));
+ xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time"));
SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
assertNull(transformed.valueSchema());
@@ -136,7 +136,7 @@ public class TimestampConverterTest {
@Test
public void testSchemalessTimestampToUnix() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "unix"));
+ xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix"));
SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
assertNull(transformed.valueSchema());
@@ -147,7 +147,7 @@ public class TimestampConverterTest {
public void testSchemalessTimestampToString() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
Map<String, String> config = new HashMap<>();
- config.put(TimestampConverter.TYPE_CONFIG, "string");
+ config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string");
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
xform.configure(config);
SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
@@ -162,7 +162,7 @@ public class TimestampConverterTest {
@Test
public void testSchemalessDateToTimestamp() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+ xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE.getTime()));
assertNull(transformed.valueSchema());
@@ -173,7 +173,7 @@ public class TimestampConverterTest {
@Test
public void testSchemalessTimeToTimestamp() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+ xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, TIME.getTime()));
assertNull(transformed.valueSchema());
@@ -184,7 +184,7 @@ public class TimestampConverterTest {
@Test
public void testSchemalessUnixToTimestamp() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+ xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME_UNIX));
assertNull(transformed.valueSchema());
@@ -195,7 +195,7 @@ public class TimestampConverterTest {
public void testSchemalessStringToTimestamp() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
Map<String, String> config = new HashMap<>();
- config.put(TimestampConverter.TYPE_CONFIG, "Timestamp");
+ config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
xform.configure(config);
SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME_STRING));
@@ -210,7 +210,7 @@ public class TimestampConverterTest {
@Test
public void testWithSchemaIdentity() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+ xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
@@ -220,7 +220,7 @@ public class TimestampConverterTest {
@Test
public void testWithSchemaTimestampToDate() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Date"));
+ xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date"));
SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
assertEquals(Date.SCHEMA, transformed.valueSchema());
@@ -230,7 +230,7 @@ public class TimestampConverterTest {
@Test
public void testWithSchemaTimestampToTime() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Time"));
+ xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time"));
SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
assertEquals(Time.SCHEMA, transformed.valueSchema());
@@ -240,7 +240,7 @@ public class TimestampConverterTest {
@Test
public void testWithSchemaTimestampToUnix() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "unix"));
+ xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix"));
SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
assertEquals(Schema.INT64_SCHEMA, transformed.valueSchema());
@@ -251,7 +251,7 @@ public class TimestampConverterTest {
public void testWithSchemaTimestampToString() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
Map<String, String> config = new HashMap<>();
- config.put(TimestampConverter.TYPE_CONFIG, "string");
+ config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string");
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
xform.configure(config);
SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
@@ -266,7 +266,7 @@ public class TimestampConverterTest {
@Test
public void testWithSchemaDateToTimestamp() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+ xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Date.SCHEMA, DATE.getTime()));
assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
@@ -277,7 +277,7 @@ public class TimestampConverterTest {
@Test
public void testWithSchemaTimeToTimestamp() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+ xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Time.SCHEMA, TIME.getTime()));
assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
@@ -288,7 +288,7 @@ public class TimestampConverterTest {
@Test
public void testWithSchemaUnixToTimestamp() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+ xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Schema.INT64_SCHEMA, DATE_PLUS_TIME_UNIX));
assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
@@ -299,7 +299,7 @@ public class TimestampConverterTest {
public void testWithSchemaStringToTimestamp() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
Map<String, String> config = new HashMap<>();
- config.put(TimestampConverter.TYPE_CONFIG, "Timestamp");
+ config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
xform.configure(config);
SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Schema.STRING_SCHEMA, DATE_PLUS_TIME_STRING));
@@ -315,7 +315,7 @@ public class TimestampConverterTest {
public void testSchemalessFieldConversion() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
Map<String, String> config = new HashMap<>();
- config.put(TimestampConverter.TYPE_CONFIG, "Date");
+ config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Date");
config.put(TimestampConverter.FIELD_CONFIG, "ts");
xform.configure(config);
@@ -330,7 +330,7 @@ public class TimestampConverterTest {
public void testWithSchemaFieldConversion() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
Map<String, String> config = new HashMap<>();
- config.put(TimestampConverter.TYPE_CONFIG, "Timestamp");
+ config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
config.put(TimestampConverter.FIELD_CONFIG, "ts");
xform.configure(config);
@@ -360,7 +360,7 @@ public class TimestampConverterTest {
@Test
public void testKey() {
TimestampConverter<SourceRecord> xform = new TimestampConverter.Key<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp"));
+ xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime(), null, null));
assertNull(transformed.keySchema());