You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Plotnikov Artem Igorevich <a....@tinkoff.ru> on 2017/09/07 17:43:43 UTC

Kafka Connect corrupts message schema while doing cast transformation

Hi, there!

I have encountered the problem, that Kafka Connect's Cast transformation loses schema information (basically, schema name) while doing type casting. I have reproduced this problem with the following test in org.apache.kafka.connect.transforms.CastTest for current trunk repository branch:
```
@SuppressWarnings("unchecked")
@Test
public void castWholeRecordValueWithSchemaBooleanAndTimestampField() {
    final Cast<SourceRecord> xform = new Cast.Value<>();
    xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int64:boolean"));

    SchemaBuilder builder = SchemaBuilder.struct();
    builder.field("int64", Schema.INT64_SCHEMA);
    builder.field("timestamp", Timestamp.SCHEMA);
    Schema supportedTypesSchema = builder.build();

    Struct recordValue = new Struct(supportedTypesSchema);
    recordValue.put("int64", (long) 64);
    recordValue.put("timestamp", new java.sql.Timestamp(0L));

    SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
            supportedTypesSchema, recordValue));

    assertNull(transformed.valueSchema());
    assertEquals(true, ((Map<String, Object>) transformed.value()).get("int64"));
    assertEquals(new java.sql.Timestamp(0L), ((Map<String, Object>) transformed.value()).get("timestamp"));
}
```
And this fails with the following exception:
```
org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.sql.Timestamp for field: "null"

                at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:240)
                at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:209)
                at org.apache.kafka.connect.data.Struct.put(Struct.java:214)
                at org.apache.kafka.connect.transforms.Cast.applyWithSchema(Cast.java:152)
                at org.apache.kafka.connect.transforms.Cast.apply(Cast.java:108)
                at org.apache.kafka.connect.transforms.CastTest.castWholeRecordValueWithSchemaBooleanAndTimestampField(CastTest.java:380)
```
This happens because Timestamp.SCHEMA has schema.type = 'INT64' and schema.name = "org.apache.kafka.connect.data.Timestamp", but org.apache.kafka.connect.transforms.Cast#getOrBuildSchema method copies only schema.type and rewrites schema.name with 'null'.

For example, such a behavior leads to connector failure while exporting data from a database having timestamp field and additional field we perform type casting on (I have a connector settings for PostgreSQL which leads to this problem).

Should I report a bug or there is something I misunderstand? I have patched a source code locally in order to fix this problem for my particular case and I am ready to prepare a more general path if it will be necessary.

I have also attached a git patch file with the failing test case.

Thanks,
Artem