You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Mohammed Ait Haddou <mo...@gmail.com> on 2020/05/20 17:16:56 UTC

Field does not exist on Debezium transformation to extract key

Hi, I am trying to create a debezium mysql connector with a transformation
to extract the key.

Before key transformations : create source connector mysql with(
    "connector.class" = 'io.debezium.connector.mysql.MySqlConnector',
    "database.hostname" = 'mysql',
    "tasks.max" = '1',
    "database.port" = '3306',
    "database.user" = 'debezium',
    "database.password" = 'dbz',
    "database.server.id" = '42',
    "database.server.name" = 'before',
    "table.whitelist" = 'deepprices.deepprices',
    "database.history.kafka.bootstrap.servers" = 'kafka:29092',
    "database.history.kafka.topic" = 'dbz.deepprices',
    "include.schema.changes" = 'true',
    "transforms" = 'unwrap',
    "transforms.unwrap.type" = 'io.debezium.transforms.UnwrapFromEnvelope'
    );
Topic results are :
rowtime: 2020/05/20 16:47:23.354 Z, key:
[St@5778462697648631933/8247607644536792125],
value: {"id": "P195910", "price": "1511.64"} When the key.converter is set
to JSON, Key becomes {"id": "P195910"}
*I want to extract id from key and make it a string key :*

*Expected results* :
rowtime: 2020/05/20 16:47:23.354 Z, key: 'P195910', value: {"id":
"P195910", "price": "1511.64"}

While trying using a transformation "ExtractField" or "ValueToKey" I
get "DataException:
Field does not exist: id " :

drop connector mysql;
    create source connector mysql with(
    "connector.class" = 'io.debezium.connector.mysql.MySqlConnector',
    "database.hostname" = 'mysql',
    "tasks.max" = '1',
    "database.port" = '3306',
    "database.user" = 'debezium',
    "database.password" = 'dbz',
    "database.server.id" = '42',
    "database.server.name" = 'after',
    "table.whitelist" = 'deepprices.deepprices',
    "database.history.kafka.bootstrap.servers" = 'kafka:29092',
    "database.history.kafka.topic" = 'dbz.deepprices',
    "include.schema.changes" = 'true',
    "key.converter" = 'org.apache.kafka.connect.json.JsonConverter',
    "key.converter.schemas.enable" = 'TRUE',
    "value.converter" = 'org.apache.kafka.connect.json.JsonConverter',
    "value.converter.schemas.enable" = 'TRUE',
    "transforms" = 'unwrap,createkey',
    "transforms.unwrap.type" = 'io.debezium.transforms.UnwrapFromEnvelope',
    "transforms.createkey.type" =
'org.apache.kafka.connect.transforms.ValueToKey',
    "transforms.createkey.fields" = 'id'
    );

-- 
Mohammed Ait Haddou
Linkedin.com/in/medait
+212697937189