You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Mohammed Ait Haddou <mo...@gmail.com> on 2020/05/20 17:16:56 UTC
Field does not exist on Debezium transformation to extract key
Hi, I am trying to create a debezium mysql connector with a transformation
to extract the key.
Before key transformations : create source connector mysql with(
"connector.class" = 'io.debezium.connector.mysql.MySqlConnector',
"database.hostname" = 'mysql',
"tasks.max" = '1',
"database.port" = '3306',
"database.user" = 'debezium',
"database.password" = 'dbz',
"database.server.id" = '42',
"database.server.name" = 'before',
"table.whitelist" = 'deepprices.deepprices',
"database.history.kafka.bootstrap.servers" = 'kafka:29092',
"database.history.kafka.topic" = 'dbz.deepprices',
"include.schema.changes" = 'true',
"transforms" = 'unwrap',
"transforms.unwrap.type" = 'io.debezium.transforms.UnwrapFromEnvelope'
);
Topic results are :
rowtime: 2020/05/20 16:47:23.354 Z, key:
[St@5778462697648631933/8247607644536792125],
value: {"id": "P195910", "price": "1511.64"} When the key.converter is set
to JSON, Key becomes {"id": "P195910"}
*I want to extract id from key and make it a string key :*
*Expected results* :
rowtime: 2020/05/20 16:47:23.354 Z, key: 'P195910', value: {"id":
"P195910", "price": "1511.64"}
While trying using a transformation "ExtractField" or "ValueToKey" I
get "DataException:
Field does not exist: id " :
drop connector mysql;
create source connector mysql with(
"connector.class" = 'io.debezium.connector.mysql.MySqlConnector',
"database.hostname" = 'mysql',
"tasks.max" = '1',
"database.port" = '3306',
"database.user" = 'debezium',
"database.password" = 'dbz',
"database.server.id" = '42',
"database.server.name" = 'after',
"table.whitelist" = 'deepprices.deepprices',
"database.history.kafka.bootstrap.servers" = 'kafka:29092',
"database.history.kafka.topic" = 'dbz.deepprices',
"include.schema.changes" = 'true',
"key.converter" = 'org.apache.kafka.connect.json.JsonConverter',
"key.converter.schemas.enable" = 'TRUE',
"value.converter" = 'org.apache.kafka.connect.json.JsonConverter',
"value.converter.schemas.enable" = 'TRUE',
"transforms" = 'unwrap,createkey',
"transforms.unwrap.type" = 'io.debezium.transforms.UnwrapFromEnvelope',
"transforms.createkey.type" =
'org.apache.kafka.connect.transforms.ValueToKey',
"transforms.createkey.fields" = 'id'
);
--
Mohammed Ait Haddou
Linkedin.com/in/medait
+212697937189