You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Fuxin Hao (Jira)" <ji...@apache.org> on 2023/01/31 07:25:00 UTC
[jira] [Updated] (KAFKA-14665) my custom SMT that converts Int to String does not work for primary keys
[ https://issues.apache.org/jira/browse/KAFKA-14665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Fuxin Hao updated KAFKA-14665:
------------------------------
Description:
I'm using {{`io.debezium.connector.postgresql.PostgresConnector}} and {{io.confluent.connect.jdbc.JdbcSinkConnector`}} to sync data between two PostgreSQL databases. And I set `{{{}time.precision.mode=adaptive`{}}} in [Debezium config|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-temporal-types]. which would serialize PostgreSQL time data type to {{Integer}} or {{Long}} and it's incompatible with {{{}JdbcSinkConnector{}}}. So I wrote an SMT to transform these data from numeric types to strings.
Say I have the following table:
{code:java}
CREATE TABLE pk_created_at (
created_at timestamp without time zone DEFAULT current_timestamp not null,
PRIMARY KEY (created_at)
);
insert into pk_created_at values(current_timestamp); {code}
My source connector configuration:
{code:java}
{
"name": "test-connector",
"config": {
"snapshot.mode": "always",
"plugin.name": "pgoutput",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "source",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "test",
"database.server.name": "test",
"slot.name" : "test",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enabled": true,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enabled": true,
"decimal.handling.mode": "string",
"time.precision.mode": "adaptive",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
} {code}
And the messages in kafka topic `{{{}test.public.pk_created_at`{}}} would be:
{code:java}
# bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", "fields":[ { "type":"int64", "optional":false, "name":"io.debezium.time.MicroTimestamp", "version":1, "field":"created_at" } ], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ "created_at":1669354751764130 } }{code}
After applying my SMT, the messages would be like this:
{code:java}
# bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", "fields":[ { "type":"string", "optional":true, "field":"created_at" } ], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ "created_at":"2022-11-25T05:39:11.764130Z" } }{code}
{{ }}
It worke[d great if |https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]{{`created_at`}} is not a part of primary keys. No error occurred. But the primary keys on some of my tables are composed of `{{{}id`{}}} and `{{{}created_at`{}}} like this: `{{{}PRIMARY KEY (id, created_at)`{}}}. Then it raised an exception in `{{{}JdbcSinkConnector`{}}} as below:
{code:java}
2022-11-25 06:57:01,450 INFO || Attempting to open connection #1 to PostgreSql [io.confluent.connect.jdbc.util.CachedConnectionProvider] 2022-11-25 06:57:01,459 INFO || Maximum table name length for database is 63 bytes [io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect] 2022-11-25 06:57:01,459 INFO || JdbcDbWriter Connected [io.confluent.connect.jdbc.sink.JdbcDbWriter] 2022-11-25 06:57:01,472 INFO || Checking PostgreSql dialect for existence of TABLE "pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,484 INFO || Using PostgreSql dialect TABLE "pk_created_at" present [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,505 INFO || Checking PostgreSql dialect for type of TABLE "pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,508 INFO || Setting metadata for table "pk_created_at" to Table{name='"pk_created_at"', type=TABLE columns=[Column{'created_at', isPrimaryKey=true, allowsNull=false, sqlType=timestamp}]} [io.confluent.connect.jdbc.util.TableDefinitions] 2022-11-25 06:57:01,510 WARN || Write of 2 records failed, remainingRetries=0 [io.confluent.connect.jdbc.sink.JdbcSinkTask] java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "pk_created_at" ("created_at") VALUES (1669359291990398) ON CONFLICT ("created_at") DO NOTHING was aborted: ERROR: column "created_at" is of type timestamp without time zone but expression is of type bigint Hint: You will need to rewrite or cast the expression. Position: 52 Call getNextException to see other errors in the batch. at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165) at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:871) at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:910) at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1638) at io.confluent.connect.jdbc.sink.BufferedRecords.executeUpdates(BufferedRecords.java:196) at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:186) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.postgresql.util.PSQLException: ERROR: column "created_at" is of type timestamp without time zone but expression is of type bigint Hint: You will need to rewrite or cast the expression. Position: 52 at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:355) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:315) at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:868) ... 17 more{code}
The error seems like the sink connector was still trying to insert `{{{}created_at`{}}} with a numeric `{{{}1669359291990398`{}}}. but I verified that the messages in the kafka topic have been transformed into strings. It worked if `{{{}created_at`{}}} is not a primary key.
[My SMT|https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]:
{code:java}
public class DebeziumTimestampConverter<R extends ConnectRecord<R>> implements Transformation<R> {
private static final Logger LOG = LoggerFactory.getLogger(DebeziumTimestampConverter.class);
private Cache<Schema, Schema> schemaUpdateCache;
private static final String PURPOSE = "convert io.debezium.time.MicroTimestamp into String";
@Override
public void configure(Map<String, ?> props) {
schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
}
@Override
public ConfigDef config() {
return new ConfigDef();
}
@Override
public void close() {
}
protected Schema operatingSchema(R record) {
return record.valueSchema();
}
protected Object operatingValue(R record) {
return record.value();
}
private String formatDate(Integer epoch) {
if (epoch == null) {
return "";
}
LocalDate d = LocalDate.ofEpochDay(epoch);
return d.toString();
}
private String formatTime(Integer epoch) {
if (epoch == null) {
return "";
}
java.util.Date date = new java.util.Date(epoch);
return new SimpleDateFormat("HH:mm:ss.SSS").format(date);
}
private String formatMicroTime(Long epochMicroSeconds) {
if (epochMicroSeconds == null) {
return "";
}
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS").withZone(ZoneId.from(ZoneOffset.UTC));
long epochSeconds = epochMicroSeconds / 1000000L;
long nanoOffset = ( epochMicroSeconds % 1000000L ) * 1000L ;
Instant instant = Instant.ofEpochSecond( epochSeconds, nanoOffset );
return formatter.format(instant);
}
private String formatTimestamp(Long epochMilliSeconds) {
if (epochMilliSeconds == null) {
return "";
}
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").withZone(ZoneId.from(ZoneOffset.UTC));
Instant instant = Instant.ofEpochMilli( epochMilliSeconds );
return formatter.format(instant);
}
private String formatMicroTimestamp(Long epochMicroSeconds) {
if (epochMicroSeconds == null) {
return "";
}
long epochSeconds = epochMicroSeconds / 1000000L;
long nanoOffset = ( epochMicroSeconds % 1000000L ) * 1000L ;
Instant instant = Instant.ofEpochSecond( epochSeconds, nanoOffset );
return instant.toString();
}
private Schema makeUpdatedSchema(Schema schema) {
final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
for (Field field: schema.fields()) {
if (field.schema().type() != Schema.Type.STRING && (
MicroTimestamp.SCHEMA_NAME.equals(field.schema().name()) ||
Date.SCHEMA_NAME.equals(field.schema().name()) ||
Time.SCHEMA_NAME.equals(field.schema().name()) ||
MicroTime.SCHEMA_NAME.equals(field.schema().name()) ||
Timestamp.SCHEMA_NAME.equals(field.schema().name()))) {
builder.field(field.name(), Schema.OPTIONAL_STRING_SCHEMA);
} else {
builder.field(field.name(), field.schema());
}
}
return builder.build();
}
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
updatedSchema,
updatedValue,
record.timestamp()
);
}
private R applyWithSchema(R r) {
final Struct struct = requireStruct(operatingValue(r), PURPOSE);
Schema updatedSchema = schemaUpdateCache.get(struct.schema());
if(updatedSchema == null) {
updatedSchema = makeUpdatedSchema(struct.schema());
schemaUpdateCache.put(struct.schema(), updatedSchema);
}
final Struct updatedValue = new Struct(updatedSchema);
for (Field field : struct.schema().fields()) {
if (field.schema().type() != Schema.Type.STRING && field.schema().name() != null) {
switch (field.schema().name()) {
case Date.SCHEMA_NAME:
Object value = struct.get(field);
if (value == null) {
updatedValue.put(field.name(), null);
continue;
}
if (value instanceof Integer) {
updatedValue.put(field.name(), formatDate((Integer)value));
} else {
updatedValue.put(field.name(), value);
}
break;
case Time.SCHEMA_NAME:
value = struct.get(field);
if (value == null) {
updatedValue.put(field.name(), null);
continue;
}
if (value instanceof Integer) {
updatedValue.put(field.name(), formatTime((Integer)value));
} else {
updatedValue.put(field.name(), value);
}
break;
case MicroTime.SCHEMA_NAME:
value = struct.get(field);
if (value == null) {
updatedValue.put(field.name(), null);
continue;
}
if (value instanceof Long) {
updatedValue.put(field.name(), formatMicroTime((Long)value));
} else {
updatedValue.put(field.name(), value);
}
break;
case Timestamp.SCHEMA_NAME:
value = struct.get(field);
if (value == null) {
updatedValue.put(field.name(), null);
continue;
}
if (value instanceof Long) {
updatedValue.put(field.name(), formatTimestamp((Long)value));
} else {
updatedValue.put(field.name(), value);
}
break;
case MicroTimestamp.SCHEMA_NAME:
value = struct.get(field);
if (value == null) {
updatedValue.put(field.name(), null);
continue;
}
if (value instanceof Long) {
updatedValue.put(field.name(), formatMicroTimestamp((Long)value));
} else {
updatedValue.put(field.name(), value);
}
break;
}
} else {
updatedValue.put(field.name(), struct.get(field));
}
}
return newRecord(r, updatedSchema, updatedValue);
}
@Override
public R apply(R record) {
if (operatingSchema(record) == null) {
return record;
} else {
return applyWithSchema(record);
}
}
}{code}
was:
I'm using {{`io.debezium.connector.postgresql.PostgresConnector}} and {{io.confluent.connect.jdbc.JdbcSinkConnector`}} to sync data between two PostgreSQL databases. And I set `{{{}time.precision.mode=adaptive`{}}} in [Debezium config|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-temporal-types]. which would serialize PostgreSQL time data type to {{Integer}} or {{Long}} and it's incompatible with {{{}JdbcSinkConnector{}}}. So I wrote an SMT to transform these data from numeric types to strings.
Say I have the following table:
{code:java}
CREATE TABLE pk_created_at (
created_at timestamp without time zone DEFAULT current_timestamp not null,
PRIMARY KEY (created_at)
);
insert into pk_created_at values(current_timestamp); {code}
{{}}
{{}}
My source connector configuration:
{code:java}
{
"name": "test-connector",
"config": {
"snapshot.mode": "always",
"plugin.name": "pgoutput",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "source",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "test",
"database.server.name": "test",
"slot.name" : "test",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enabled": true,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enabled": true,
"decimal.handling.mode": "string",
"time.precision.mode": "adaptive",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
} {code}
{{}}
{{}}
And the messages in kafka topic `{{{}test.public.pk_created_at`{}}} would be:
{{}}
{code:java}
# bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", "fields":[ { "type":"int64", "optional":false, "name":"io.debezium.time.MicroTimestamp", "version":1, "field":"created_at" } ], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ "created_at":1669354751764130 } }{code}
{{}}
After applying my SMT, the messages would be like this:
{{}}
{code:java}
# bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", "fields":[ { "type":"string", "optional":true, "field":"created_at" } ], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ "created_at":"2022-11-25T05:39:11.764130Z" } }{code}
{{ }}
It worke[d great if |https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]{{`created_at`}} is not a part of primary keys. No error occurred. But the primary keys on some of my tables are composed of `{{{}id`{}}} and `{{{}created_at`{}}} like this: `{{{}PRIMARY KEY (id, created_at)`{}}}. Then it raised an exception in `{{{}JdbcSinkConnector`{}}} as below:
{{}}
{code:java}
2022-11-25 06:57:01,450 INFO || Attempting to open connection #1 to PostgreSql [io.confluent.connect.jdbc.util.CachedConnectionProvider] 2022-11-25 06:57:01,459 INFO || Maximum table name length for database is 63 bytes [io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect] 2022-11-25 06:57:01,459 INFO || JdbcDbWriter Connected [io.confluent.connect.jdbc.sink.JdbcDbWriter] 2022-11-25 06:57:01,472 INFO || Checking PostgreSql dialect for existence of TABLE "pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,484 INFO || Using PostgreSql dialect TABLE "pk_created_at" present [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,505 INFO || Checking PostgreSql dialect for type of TABLE "pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,508 INFO || Setting metadata for table "pk_created_at" to Table{name='"pk_created_at"', type=TABLE columns=[Column{'created_at', isPrimaryKey=true, allowsNull=false, sqlType=timestamp}]} [io.confluent.connect.jdbc.util.TableDefinitions] 2022-11-25 06:57:01,510 WARN || Write of 2 records failed, remainingRetries=0 [io.confluent.connect.jdbc.sink.JdbcSinkTask] java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "pk_created_at" ("created_at") VALUES (1669359291990398) ON CONFLICT ("created_at") DO NOTHING was aborted: ERROR: column "created_at" is of type timestamp without time zone but expression is of type bigint Hint: You will need to rewrite or cast the expression. Position: 52 Call getNextException to see other errors in the batch. at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165) at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:871) at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:910) at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1638) at io.confluent.connect.jdbc.sink.BufferedRecords.executeUpdates(BufferedRecords.java:196) at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:186) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.postgresql.util.PSQLException: ERROR: column "created_at" is of type timestamp without time zone but expression is of type bigint Hint: You will need to rewrite or cast the expression. Position: 52 at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:355) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:315) at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:868) ... 17 more{code}
The error seems like the sink connector was still trying to insert `{{{}created_at`{}}} with a numeric `{{{}1669359291990398`{}}}. but I verified that the messages in the kafka topic have been transformed into strings. It worked if `{{{}created_at`{}}} is not a primary key.
[My SMT|https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]:
{code:java}
public class DebeziumTimestampConverter<R extends ConnectRecord<R>> implements Transformation<R> {
private static final Logger LOG = LoggerFactory.getLogger(DebeziumTimestampConverter.class);
private Cache<Schema, Schema> schemaUpdateCache;
private static final String PURPOSE = "convert io.debezium.time.MicroTimestamp into String";
@Override
public void configure(Map<String, ?> props) {
schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
}
@Override
public ConfigDef config() {
return new ConfigDef();
}
@Override
public void close() {
}
protected Schema operatingSchema(R record) {
return record.valueSchema();
}
protected Object operatingValue(R record) {
return record.value();
}
private String formatDate(Integer epoch) {
if (epoch == null) {
return "";
}
LocalDate d = LocalDate.ofEpochDay(epoch);
return d.toString();
}
private String formatTime(Integer epoch) {
if (epoch == null) {
return "";
}
java.util.Date date = new java.util.Date(epoch);
return new SimpleDateFormat("HH:mm:ss.SSS").format(date);
}
private String formatMicroTime(Long epochMicroSeconds) {
if (epochMicroSeconds == null) {
return "";
}
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS").withZone(ZoneId.from(ZoneOffset.UTC));
long epochSeconds = epochMicroSeconds / 1000000L;
long nanoOffset = ( epochMicroSeconds % 1000000L ) * 1000L ;
Instant instant = Instant.ofEpochSecond( epochSeconds, nanoOffset );
return formatter.format(instant);
}
private String formatTimestamp(Long epochMilliSeconds) {
if (epochMilliSeconds == null) {
return "";
}
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").withZone(ZoneId.from(ZoneOffset.UTC));
Instant instant = Instant.ofEpochMilli( epochMilliSeconds );
return formatter.format(instant);
}
private String formatMicroTimestamp(Long epochMicroSeconds) {
if (epochMicroSeconds == null) {
return "";
}
long epochSeconds = epochMicroSeconds / 1000000L;
long nanoOffset = ( epochMicroSeconds % 1000000L ) * 1000L ;
Instant instant = Instant.ofEpochSecond( epochSeconds, nanoOffset );
return instant.toString();
}
private Schema makeUpdatedSchema(Schema schema) {
final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
for (Field field: schema.fields()) {
if (field.schema().type() != Schema.Type.STRING && (
MicroTimestamp.SCHEMA_NAME.equals(field.schema().name()) ||
Date.SCHEMA_NAME.equals(field.schema().name()) ||
Time.SCHEMA_NAME.equals(field.schema().name()) ||
MicroTime.SCHEMA_NAME.equals(field.schema().name()) ||
Timestamp.SCHEMA_NAME.equals(field.schema().name()))) {
builder.field(field.name(), Schema.OPTIONAL_STRING_SCHEMA);
} else {
builder.field(field.name(), field.schema());
}
}
return builder.build();
}
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
updatedSchema,
updatedValue,
record.timestamp()
);
}
private R applyWithSchema(R r) {
final Struct struct = requireStruct(operatingValue(r), PURPOSE);
Schema updatedSchema = schemaUpdateCache.get(struct.schema());
if(updatedSchema == null) {
updatedSchema = makeUpdatedSchema(struct.schema());
schemaUpdateCache.put(struct.schema(), updatedSchema);
}
final Struct updatedValue = new Struct(updatedSchema);
for (Field field : struct.schema().fields()) {
if (field.schema().type() != Schema.Type.STRING && field.schema().name() != null) {
switch (field.schema().name()) {
case Date.SCHEMA_NAME:
Object value = struct.get(field);
if (value == null) {
updatedValue.put(field.name(), null);
continue;
}
if (value instanceof Integer) {
updatedValue.put(field.name(), formatDate((Integer)value));
} else {
updatedValue.put(field.name(), value);
}
break;
case Time.SCHEMA_NAME:
value = struct.get(field);
if (value == null) {
updatedValue.put(field.name(), null);
continue;
}
if (value instanceof Integer) {
updatedValue.put(field.name(), formatTime((Integer)value));
} else {
updatedValue.put(field.name(), value);
}
break;
case MicroTime.SCHEMA_NAME:
value = struct.get(field);
if (value == null) {
updatedValue.put(field.name(), null);
continue;
}
if (value instanceof Long) {
updatedValue.put(field.name(), formatMicroTime((Long)value));
} else {
updatedValue.put(field.name(), value);
}
break;
case Timestamp.SCHEMA_NAME:
value = struct.get(field);
if (value == null) {
updatedValue.put(field.name(), null);
continue;
}
if (value instanceof Long) {
updatedValue.put(field.name(), formatTimestamp((Long)value));
} else {
updatedValue.put(field.name(), value);
}
break;
case MicroTimestamp.SCHEMA_NAME:
value = struct.get(field);
if (value == null) {
updatedValue.put(field.name(), null);
continue;
}
if (value instanceof Long) {
updatedValue.put(field.name(), formatMicroTimestamp((Long)value));
} else {
updatedValue.put(field.name(), value);
}
break;
}
} else {
updatedValue.put(field.name(), struct.get(field));
}
}
return newRecord(r, updatedSchema, updatedValue);
}
@Override
public R apply(R record) {
if (operatingSchema(record) == null) {
return record;
} else {
return applyWithSchema(record);
}
}
}{code}
> my custom SMT that converts Int to String does not work for primary keys
> ------------------------------------------------------------------------
>
> Key: KAFKA-14665
> URL: https://issues.apache.org/jira/browse/KAFKA-14665
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Affects Versions: 3.1.0
> Reporter: Fuxin Hao
> Priority: Major
>
> I'm using {{`io.debezium.connector.postgresql.PostgresConnector}} and {{io.confluent.connect.jdbc.JdbcSinkConnector`}} to sync data between two PostgreSQL databases. And I set `{{{}time.precision.mode=adaptive`{}}} in [Debezium config|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-temporal-types]. which would serialize PostgreSQL time data type to {{Integer}} or {{Long}} and it's incompatible with {{{}JdbcSinkConnector{}}}. So I wrote an SMT to transform these data from numeric types to strings.
>
> Say I have the following table:
> {code:java}
> CREATE TABLE pk_created_at (
> created_at timestamp without time zone DEFAULT current_timestamp not null,
> PRIMARY KEY (created_at)
> );
> insert into pk_created_at values(current_timestamp); {code}
>
> My source connector configuration:
> {code:java}
> {
> "name": "test-connector",
> "config": {
> "snapshot.mode": "always",
> "plugin.name": "pgoutput",
> "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
> "tasks.max": "1",
> "database.hostname": "source",
> "database.port": "5432",
> "database.user": "postgres",
> "database.password": "postgres",
> "database.dbname" : "test",
> "database.server.name": "test",
> "slot.name" : "test",
> "key.converter": "org.apache.kafka.connect.json.JsonConverter",
> "key.converter.schemas.enabled": true,
> "value.converter": "org.apache.kafka.connect.json.JsonConverter",
> "value.converter.schemas.enabled": true,
> "decimal.handling.mode": "string",
> "time.precision.mode": "adaptive",
> "transforms": "unwrap",
> "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
> }
> } {code}
>
> And the messages in kafka topic `{{{}test.public.pk_created_at`{}}} would be:
> {code:java}
> # bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", "fields":[ { "type":"int64", "optional":false, "name":"io.debezium.time.MicroTimestamp", "version":1, "field":"created_at" } ], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ "created_at":1669354751764130 } }{code}
>
> After applying my SMT, the messages would be like this:
> {code:java}
> # bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", "fields":[ { "type":"string", "optional":true, "field":"created_at" } ], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ "created_at":"2022-11-25T05:39:11.764130Z" } }{code}
> {{ }}
> It worke[d great if |https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]{{`created_at`}} is not a part of primary keys. No error occurred. But the primary keys on some of my tables are composed of `{{{}id`{}}} and `{{{}created_at`{}}} like this: `{{{}PRIMARY KEY (id, created_at)`{}}}. Then it raised an exception in `{{{}JdbcSinkConnector`{}}} as below:
> {code:java}
> 2022-11-25 06:57:01,450 INFO || Attempting to open connection #1 to PostgreSql [io.confluent.connect.jdbc.util.CachedConnectionProvider] 2022-11-25 06:57:01,459 INFO || Maximum table name length for database is 63 bytes [io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect] 2022-11-25 06:57:01,459 INFO || JdbcDbWriter Connected [io.confluent.connect.jdbc.sink.JdbcDbWriter] 2022-11-25 06:57:01,472 INFO || Checking PostgreSql dialect for existence of TABLE "pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,484 INFO || Using PostgreSql dialect TABLE "pk_created_at" present [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,505 INFO || Checking PostgreSql dialect for type of TABLE "pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,508 INFO || Setting metadata for table "pk_created_at" to Table{name='"pk_created_at"', type=TABLE columns=[Column{'created_at', isPrimaryKey=true, allowsNull=false, sqlType=timestamp}]} [io.confluent.connect.jdbc.util.TableDefinitions] 2022-11-25 06:57:01,510 WARN || Write of 2 records failed, remainingRetries=0 [io.confluent.connect.jdbc.sink.JdbcSinkTask] java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "pk_created_at" ("created_at") VALUES (1669359291990398) ON CONFLICT ("created_at") DO NOTHING was aborted: ERROR: column "created_at" is of type timestamp without time zone but expression is of type bigint Hint: You will need to rewrite or cast the expression. Position: 52 Call getNextException to see other errors in the batch. at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165) at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:871) at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:910) at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1638) at io.confluent.connect.jdbc.sink.BufferedRecords.executeUpdates(BufferedRecords.java:196) at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:186) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.postgresql.util.PSQLException: ERROR: column "created_at" is of type timestamp without time zone but expression is of type bigint Hint: You will need to rewrite or cast the expression. Position: 52 at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:355) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:315) at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:868) ... 17 more{code}
>
> The error seems like the sink connector was still trying to insert `{{{}created_at`{}}} with a numeric `{{{}1669359291990398`{}}}. but I verified that the messages in the kafka topic have been transformed into strings. It worked if `{{{}created_at`{}}} is not a primary key.
>
> [My SMT|https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]:
> {code:java}
> public class DebeziumTimestampConverter<R extends ConnectRecord<R>> implements Transformation<R> {
> private static final Logger LOG = LoggerFactory.getLogger(DebeziumTimestampConverter.class);
> private Cache<Schema, Schema> schemaUpdateCache;
> private static final String PURPOSE = "convert io.debezium.time.MicroTimestamp into String";
> @Override
> public void configure(Map<String, ?> props) {
> schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> @Override
> public void close() {
> }
> protected Schema operatingSchema(R record) {
> return record.valueSchema();
> }
> protected Object operatingValue(R record) {
> return record.value();
> }
> private String formatDate(Integer epoch) {
> if (epoch == null) {
> return "";
> }
> LocalDate d = LocalDate.ofEpochDay(epoch);
> return d.toString();
> }
> private String formatTime(Integer epoch) {
> if (epoch == null) {
> return "";
> }
> java.util.Date date = new java.util.Date(epoch);
> return new SimpleDateFormat("HH:mm:ss.SSS").format(date);
> }
> private String formatMicroTime(Long epochMicroSeconds) {
> if (epochMicroSeconds == null) {
> return "";
> }
> DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS").withZone(ZoneId.from(ZoneOffset.UTC));
> long epochSeconds = epochMicroSeconds / 1000000L;
> long nanoOffset = ( epochMicroSeconds % 1000000L ) * 1000L ;
> Instant instant = Instant.ofEpochSecond( epochSeconds, nanoOffset );
> return formatter.format(instant);
> }
> private String formatTimestamp(Long epochMilliSeconds) {
> if (epochMilliSeconds == null) {
> return "";
> }
> DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").withZone(ZoneId.from(ZoneOffset.UTC));
> Instant instant = Instant.ofEpochMilli( epochMilliSeconds );
> return formatter.format(instant);
> }
> private String formatMicroTimestamp(Long epochMicroSeconds) {
> if (epochMicroSeconds == null) {
> return "";
> }
> long epochSeconds = epochMicroSeconds / 1000000L;
> long nanoOffset = ( epochMicroSeconds % 1000000L ) * 1000L ;
> Instant instant = Instant.ofEpochSecond( epochSeconds, nanoOffset );
> return instant.toString();
> }
> private Schema makeUpdatedSchema(Schema schema) {
> final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
> for (Field field: schema.fields()) {
> if (field.schema().type() != Schema.Type.STRING && (
> MicroTimestamp.SCHEMA_NAME.equals(field.schema().name()) ||
> Date.SCHEMA_NAME.equals(field.schema().name()) ||
> Time.SCHEMA_NAME.equals(field.schema().name()) ||
> MicroTime.SCHEMA_NAME.equals(field.schema().name()) ||
> Timestamp.SCHEMA_NAME.equals(field.schema().name()))) {
> builder.field(field.name(), Schema.OPTIONAL_STRING_SCHEMA);
> } else {
> builder.field(field.name(), field.schema());
> }
> }
> return builder.build();
> }
> protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
> return record.newRecord(
> record.topic(),
> record.kafkaPartition(),
> record.keySchema(),
> record.key(),
> updatedSchema,
> updatedValue,
> record.timestamp()
> );
> }
> private R applyWithSchema(R r) {
> final Struct struct = requireStruct(operatingValue(r), PURPOSE);
> Schema updatedSchema = schemaUpdateCache.get(struct.schema());
> if(updatedSchema == null) {
> updatedSchema = makeUpdatedSchema(struct.schema());
> schemaUpdateCache.put(struct.schema(), updatedSchema);
> }
> final Struct updatedValue = new Struct(updatedSchema);
> for (Field field : struct.schema().fields()) {
> if (field.schema().type() != Schema.Type.STRING && field.schema().name() != null) {
> switch (field.schema().name()) {
> case Date.SCHEMA_NAME:
> Object value = struct.get(field);
> if (value == null) {
> updatedValue.put(field.name(), null);
> continue;
> }
> if (value instanceof Integer) {
> updatedValue.put(field.name(), formatDate((Integer)value));
> } else {
> updatedValue.put(field.name(), value);
> }
> break;
> case Time.SCHEMA_NAME:
> value = struct.get(field);
> if (value == null) {
> updatedValue.put(field.name(), null);
> continue;
> }
> if (value instanceof Integer) {
> updatedValue.put(field.name(), formatTime((Integer)value));
> } else {
> updatedValue.put(field.name(), value);
> }
> break;
> case MicroTime.SCHEMA_NAME:
> value = struct.get(field);
> if (value == null) {
> updatedValue.put(field.name(), null);
> continue;
> }
> if (value instanceof Long) {
> updatedValue.put(field.name(), formatMicroTime((Long)value));
> } else {
> updatedValue.put(field.name(), value);
> }
> break;
> case Timestamp.SCHEMA_NAME:
> value = struct.get(field);
> if (value == null) {
> updatedValue.put(field.name(), null);
> continue;
> }
> if (value instanceof Long) {
> updatedValue.put(field.name(), formatTimestamp((Long)value));
> } else {
> updatedValue.put(field.name(), value);
> }
> break;
> case MicroTimestamp.SCHEMA_NAME:
> value = struct.get(field);
> if (value == null) {
> updatedValue.put(field.name(), null);
> continue;
> }
> if (value instanceof Long) {
> updatedValue.put(field.name(), formatMicroTimestamp((Long)value));
> } else {
> updatedValue.put(field.name(), value);
> }
> break;
> }
> } else {
> updatedValue.put(field.name(), struct.get(field));
> }
> }
> return newRecord(r, updatedSchema, updatedValue);
> }
> @Override
> public R apply(R record) {
> if (operatingSchema(record) == null) {
> return record;
> } else {
> return applyWithSchema(record);
> }
> }
> }{code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)