You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Shiti Saxena <ss...@gmail.com> on 2016/02/04 17:30:21 UTC

Kafka Connect - SinkRecord schema

Hi,

I was trying to define the following Kafka Connect pipeline :
JDBC Source -> Console Sink  using bulk mode

I realized the schema resulting from SinkRecord.valueSchema was incorrect.
I modified FileStreamSinkTask's put method to,

public void put(Collection<SinkRecord> sinkRecords) {
    for (SinkRecord record : sinkRecords) {
        log.trace("Writing line to {}: {}", logFilename(), record.value());
        for(Field x :record.valueSchema().fields()){
            outputStream.println(x.name());
        }
        outputStream.println(record.value());
    }
}

Is this the correct way to get the schema or am I missing something?

I was expecting the output to be

id
name
email
department
modified
org.apache.kafka.connect.data.Struct@59d634b7


but it was

id
department
department
department
modified
org.apache.kafka.connect.data.Struct@59d634b7


I tried it with different tables and realized that the names of
columns with same datatype are messed up.

Can someone please tell me how to get the schema?

Or is this is a bug in the Connect API since the schema of the
SourceRecord created by JDBC Source Connector is correct.