You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Joost Molenaar <j....@gmail.com> on 2022/05/02 13:59:48 UTC

Flink-SQL returning duplicate rows for some records

Hello all,

I'm trying to use Flink-SQL to monitor a Kafka topic that's populated by
Debezium, which is in turn monitoring a MS-SQL CDC table. For some reason,
Flink-SQL shows a new row when I update the boolean field, but updates the
row in place when I update the text field, and I'm not understanding why
this happens. My ultimate goal is to use Flink-SQL to do a join on records
that come from both sides of a 1:N relation in the foreign database, to
expose a more ready to consume JSON object to downstream consumers.

The source table is defined like this in MS-SQL:

    CREATE TABLE todo_list (
        id int IDENTITY NOT NULL,
        done bit NOT NULL DEFAULT 0,
        name varchar(MAX) NOT NULL,
        CONSTRAINT PK_todo_list PRIMARY KEY (id)
    );

This is the configuration I'm sending to Debezium, note that I'm not
including the
JSON-schema in both keys and values:

    {
        "name": "todo-connector",
        "config": {
            "connector.class":
"io.debezium.connector.sqlserver.SqlServerConnector",
            "tasks.max": "1",
            "database.server.name": "mssql",
            "database.hostname": "10.88.10.1",
            "database.port": "1433",
            "database.user": "sa",
            "database.password": "...",
            "database.dbname": "todo",
            "database.history.kafka.bootstrap.servers": "10.88.10.10:9092",
            "database.history.kafka.topic": "schema-changes.todo",
            "key.converter": "org.apache.kafka.connect.json.JsonConverter",
            "key.converter.schemas.enable": false,
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": false
        }
    }

So Debezium is publishing events to Kafka with keys like this:

    {"id":3}

And values like this (whitespace added for readability), this is updating the
value of the 'name' field:

    {
      "before": {
        "id": 3,
        "done": false,
        "name": "test"
      },
      "after": {
        "id": 3,
        "done": false,
        "name": "test2"
      },
      "source": {
        "version": "1.9.0.Final",
        "connector": "sqlserver",
        "name": "mssql",
        "ts_ms": 1651497653043,
        "snapshot": "false",
        "db": "todo",
        "sequence": null,
        "schema": "dbo",
        "table": "todo_list",
        "change_lsn": "00000025:00000d58:0002",
        "commit_lsn": "00000025:00000d58:0003",
        "event_serial_no": 2
      },
      "op": "u",
      "ts_ms": 1651497654127,
      "transaction": null
    }

(I verified this using a Python script that follows the relevant Kafka topic.)

Next, I'm trying to follow this CDC stream in Flink by adding the
Kafka connector
for Flink SQL, defining a source table and starting a job in the Flink-SQL CLI:

    ADD JAR '/opt/flink/opt/flink-sql-connector-kafka_2.11-1.14.4.jar';

    CREATE TABLE todo_list (
        k_id BIGINT,
        done BOOLEAN,
        name STRING
    )
    WITH (
        'connector'='kafka',
        'topic'='mssql.dbo.todo_list',
        'properties.bootstrap.servers'='10.88.10.10:9092',
        'properties.group.id'='flinksql-todo-list',
        'scan.startup.mode'='earliest-offset',
        'key.format'='json',
        'key.fields-prefix'='k_',
        'key.fields'='k_id',
        'value.format'='debezium-json',
        'value.debezium-json.schema-include'='false',
        'value.fields-include'='EXCEPT_KEY'
    );

    SELECT * FROM todo_list;

Now, when I perform a query like this in the MS-SQL database:

    UPDATE todo_list SET name='test2' WHERE id=3;

Now I see that the Flink-SQL client updates the row with id=3 to have the new
value "test2" for the 'name' field, as I was expecting. However, when I
duplicate the 'done' field to have a different value, Flink-SQL seems to leave
the old row with values (3, False, 'test2') intact, and shows a new row with
values (3, True, 'test2').

I tried to append a `PRIMARY KEY (k_id) NOT ENFORCED` line between the first
parentheses in the CREATE TABLE statement, but this seems to make no
difference, except when running `DESCRIBE todo_list` in Flink-SQL.

I have no idea why the boolean field would cause different behavior than the
text field. Am I missing some piece of configuration, are my expectations
wrong?


Regards,
Joost Molenaar

Re: Flink-SQL returning duplicate rows for some records

Posted by Joost Molenaar <j....@gmail.com>.
Hi Leonard and Martijn, thanks for looking into this.

I ran into the issue on Flink 1.14.4 (with the matching
flink-sql-connector-kafka based on Scala 2.11), but reproduced the problem
today in 1.15.0 (again with the matching flink-sql-connector-kafka). I haven't
used older versions than 1.14.4.

These following debezium-json messages illustrate the problem; note that
they're published without schema and that they're all produced to Kafka with
this message key:

    {"id":1}

These are the message values; first for an INSERT:

    {"before":null,"after":{"id":1,"done":false,"name":"Initial
value"},"source":{"version":"1.9.2.Final","connector":"sqlserver","name":"mssql","ts_ms":1652104409527,"snapshot":"false","db":"todo","sequence":null,"schema":"dbo","table":"todo_list","change_lsn":"00000025:00000528:001c","commit_lsn":"00000025:00000528:001d","event_serial_no":1},"op":"c","ts_ms":1652104413976,"transaction":null}

Then an UPDATE on the text field:

    {"before":{"id":1,"done":false,"name":"Initial
value"},"after":{"id":1,"done":false,"name":"Updated
#1"},"source":{"version":"1.9.2.Final","connector":"sqlserver","name":"mssql","ts_ms":1652104502837,"snapshot":"false","db":"todo","sequence":null,"schema":"dbo","table":"todo_list","change_lsn":"00000025:000005d8:0002","commit_lsn":"00000025:000005d8:0003","event_serial_no":2},"op":"u","ts_ms":1652104503260,"transaction":null}

Then an UPDATE on a boolean field -- this causes a duplicated row for id=1:

    {"before":{"id":1,"done":false,"name":""},"after":{"id":1,"done":true,"name":"Updated
#1"},"source":{"version":"1.9.2.Final","connector":"sqlserver","name":"mssql","ts_ms":1652104507080,"snapshot":"false","db":"todo","sequence":null,"schema":"dbo","table":"todo_list","change_lsn":"00000025:000005f0:0002","commit_lsn":"00000025:000005f0:0003","event_serial_no":2},"op":"u","ts_ms":1652104508248,"transaction":null}

Another UPDATE on the text field -- this causes an update the of text
field in the second instance of the id=1 row:

    {"before":{"id":1,"done":true,"name":"Updated
#1"},"after":{"id":1,"done":true,"name":"Updated
#2"},"source":{"version":"1.9.2.Final","connector":"sqlserver","name":"mssql","ts_ms":1652104511600,"snapshot":"false","db":"todo","sequence":null,"schema":"dbo","table":"todo_list","change_lsn":"00000025:00000608:0002","commit_lsn":"00000025:00000608:0003","event_serial_no":2},"op":"u","ts_ms":1652104513257,"transaction":null}

And finally a DELETE -- this causes the deletion of the second row
with id=1, but not the first:

    {"before":{"id":1,"done":true,"name":"Updated
#2"},"after":null,"source":{"version":"1.9.2.Final","connector":"sqlserver","name":"mssql","ts_ms":1652104514893,"snapshot":"false","db":"todo","sequence":null,"schema":"dbo","table":"todo_list","change_lsn":"00000025:00000620:0002","commit_lsn":"00000025:00000620:0005","event_serial_no":1},"op":"d","ts_ms":1652104518749,"transaction":null}

(Debezium then produces a tombstone record with the same key
`{"id":1}` and value `null`.)

For reference, this is the CREATE TABLE statement for the source connector::

    CREATE TABLE todo_list (
        id BIGINT,
        done BOOLEAN,
        name STRING
    )
    WITH (
        'connector'='kafka',
        'topic'='mssql.dbo.todo_list',
        'properties.bootstrap.servers'='10.88.10.10:9092',
        'properties.group.id'='flinksql-todo-list',
        'scan.startup.mode'='earliest-offset',
        'key.format'='json',
        'key.fields'='id',
        'value.format'='debezium-json',
        'value.debezium-json.schema-include'='false',
        'value.fields-include'='EXCEPT_KEY'
    );

Please let me know if there's anything else I can do to clear this up.

Kind regards,
Joost Molenaar

On Sat, 7 May 2022 at 10:26, Leonard Xu <xb...@gmail.com> wrote:
>
> Hi Joost
>
> Could you share your flink version and the two records in debezium-json format which produced by two MS SQL UPDATE statement ?
>
> Best,
> Leonard
>
> > 2022年5月2日 下午9:59,Joost Molenaar <j....@gmail.com> 写道:
> >
> > Hello all,
> >
> > I'm trying to use Flink-SQL to monitor a Kafka topic that's populated by
> > Debezium, which is in turn monitoring a MS-SQL CDC table. For some reason,
> > Flink-SQL shows a new row when I update the boolean field, but updates the
> > row in place when I update the text field, and I'm not understanding why
> > this happens. My ultimate goal is to use Flink-SQL to do a join on records
> > that come from both sides of a 1:N relation in the foreign database, to
> > expose a more ready to consume JSON object to downstream consumers.
> >
> > The source table is defined like this in MS-SQL:
> >
> >    CREATE TABLE todo_list (
> >        id int IDENTITY NOT NULL,
> >        done bit NOT NULL DEFAULT 0,
> >        name varchar(MAX) NOT NULL,
> >        CONSTRAINT PK_todo_list PRIMARY KEY (id)
> >    );
> >
> > This is the configuration I'm sending to Debezium, note that I'm not
> > including the
> > JSON-schema in both keys and values:
> >
> >    {
> >        "name": "todo-connector",
> >        "config": {
> >            "connector.class":
> > "io.debezium.connector.sqlserver.SqlServerConnector",
> >            "tasks.max": "1",
> >            "database.server.name": "mssql",
> >            "database.hostname": "10.88.10.1",
> >            "database.port": "1433",
> >            "database.user": "sa",
> >            "database.password": "...",
> >            "database.dbname": "todo",
> >            "database.history.kafka.bootstrap.servers": "10.88.10.10:9092",
> >            "database.history.kafka.topic": "schema-changes.todo",
> >            "key.converter": "org.apache.kafka.connect.json.JsonConverter",
> >            "key.converter.schemas.enable": false,
> >            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
> >            "value.converter.schemas.enable": false
> >        }
> >    }
> >
> > So Debezium is publishing events to Kafka with keys like this:
> >
> >    {"id":3}
> >
> > And values like this (whitespace added for readability), this is updating the
> > value of the 'name' field:
> >
> >    {
> >      "before": {
> >        "id": 3,
> >        "done": false,
> >        "name": "test"
> >      },
> >      "after": {
> >        "id": 3,
> >        "done": false,
> >        "name": "test2"
> >      },
> >      "source": {
> >        "version": "1.9.0.Final",
> >        "connector": "sqlserver",
> >        "name": "mssql",
> >        "ts_ms": 1651497653043,
> >        "snapshot": "false",
> >        "db": "todo",
> >        "sequence": null,
> >        "schema": "dbo",
> >        "table": "todo_list",
> >        "change_lsn": "00000025:00000d58:0002",
> >        "commit_lsn": "00000025:00000d58:0003",
> >        "event_serial_no": 2
> >      },
> >      "op": "u",
> >      "ts_ms": 1651497654127,
> >      "transaction": null
> >    }
> >
> > (I verified this using a Python script that follows the relevant Kafka topic.)
> >
> > Next, I'm trying to follow this CDC stream in Flink by adding the
> > Kafka connector
> > for Flink SQL, defining a source table and starting a job in the Flink-SQL CLI:
> >
> >    ADD JAR '/opt/flink/opt/flink-sql-connector-kafka_2.11-1.14.4.jar';
> >
> >    CREATE TABLE todo_list (
> >        k_id BIGINT,
> >        done BOOLEAN,
> >        name STRING
> >    )
> >    WITH (
> >        'connector'='kafka',
> >        'topic'='mssql.dbo.todo_list',
> >        'properties.bootstrap.servers'='10.88.10.10:9092',
> >        'properties.group.id'='flinksql-todo-list',
> >        'scan.startup.mode'='earliest-offset',
> >        'key.format'='json',
> >        'key.fields-prefix'='k_',
> >        'key.fields'='k_id',
> >        'value.format'='debezium-json',
> >        'value.debezium-json.schema-include'='false',
> >        'value.fields-include'='EXCEPT_KEY'
> >    );
> >
> >    SELECT * FROM todo_list;
> >
> > Now, when I perform a query like this in the MS-SQL database:
> >
> >    UPDATE todo_list SET name='test2' WHERE id=3;
> >
> > Now I see that the Flink-SQL client updates the row with id=3 to have the new
> > value "test2" for the 'name' field, as I was expecting. However, when I
> > duplicate the 'done' field to have a different value, Flink-SQL seems to leave
> > the old row with values (3, False, 'test2') intact, and shows a new row with
> > values (3, True, 'test2').
> >
> > I tried to append a `PRIMARY KEY (k_id) NOT ENFORCED` line between the first
> > parentheses in the CREATE TABLE statement, but this seems to make no
> > difference, except when running `DESCRIBE todo_list` in Flink-SQL.
> >
> > I have no idea why the boolean field would cause different behavior than the
> > text field. Am I missing some piece of configuration, are my expectations
> > wrong?
> >
> >
> > Regards,
> > Joost Molenaar
>

Re: Flink-SQL returning duplicate rows for some records

Posted by Leonard Xu <xb...@gmail.com>.
Hi Joost

Could you share your flink version and the two records in debezium-json format which produced by two MS SQL UPDATE statement ?

Best,
Leonard

> 2022年5月2日 下午9:59,Joost Molenaar <j....@gmail.com> 写道:
> 
> Hello all,
> 
> I'm trying to use Flink-SQL to monitor a Kafka topic that's populated by
> Debezium, which is in turn monitoring a MS-SQL CDC table. For some reason,
> Flink-SQL shows a new row when I update the boolean field, but updates the
> row in place when I update the text field, and I'm not understanding why
> this happens. My ultimate goal is to use Flink-SQL to do a join on records
> that come from both sides of a 1:N relation in the foreign database, to
> expose a more ready to consume JSON object to downstream consumers.
> 
> The source table is defined like this in MS-SQL:
> 
>    CREATE TABLE todo_list (
>        id int IDENTITY NOT NULL,
>        done bit NOT NULL DEFAULT 0,
>        name varchar(MAX) NOT NULL,
>        CONSTRAINT PK_todo_list PRIMARY KEY (id)
>    );
> 
> This is the configuration I'm sending to Debezium, note that I'm not
> including the
> JSON-schema in both keys and values:
> 
>    {
>        "name": "todo-connector",
>        "config": {
>            "connector.class":
> "io.debezium.connector.sqlserver.SqlServerConnector",
>            "tasks.max": "1",
>            "database.server.name": "mssql",
>            "database.hostname": "10.88.10.1",
>            "database.port": "1433",
>            "database.user": "sa",
>            "database.password": "...",
>            "database.dbname": "todo",
>            "database.history.kafka.bootstrap.servers": "10.88.10.10:9092",
>            "database.history.kafka.topic": "schema-changes.todo",
>            "key.converter": "org.apache.kafka.connect.json.JsonConverter",
>            "key.converter.schemas.enable": false,
>            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
>            "value.converter.schemas.enable": false
>        }
>    }
> 
> So Debezium is publishing events to Kafka with keys like this:
> 
>    {"id":3}
> 
> And values like this (whitespace added for readability), this is updating the
> value of the 'name' field:
> 
>    {
>      "before": {
>        "id": 3,
>        "done": false,
>        "name": "test"
>      },
>      "after": {
>        "id": 3,
>        "done": false,
>        "name": "test2"
>      },
>      "source": {
>        "version": "1.9.0.Final",
>        "connector": "sqlserver",
>        "name": "mssql",
>        "ts_ms": 1651497653043,
>        "snapshot": "false",
>        "db": "todo",
>        "sequence": null,
>        "schema": "dbo",
>        "table": "todo_list",
>        "change_lsn": "00000025:00000d58:0002",
>        "commit_lsn": "00000025:00000d58:0003",
>        "event_serial_no": 2
>      },
>      "op": "u",
>      "ts_ms": 1651497654127,
>      "transaction": null
>    }
> 
> (I verified this using a Python script that follows the relevant Kafka topic.)
> 
> Next, I'm trying to follow this CDC stream in Flink by adding the
> Kafka connector
> for Flink SQL, defining a source table and starting a job in the Flink-SQL CLI:
> 
>    ADD JAR '/opt/flink/opt/flink-sql-connector-kafka_2.11-1.14.4.jar';
> 
>    CREATE TABLE todo_list (
>        k_id BIGINT,
>        done BOOLEAN,
>        name STRING
>    )
>    WITH (
>        'connector'='kafka',
>        'topic'='mssql.dbo.todo_list',
>        'properties.bootstrap.servers'='10.88.10.10:9092',
>        'properties.group.id'='flinksql-todo-list',
>        'scan.startup.mode'='earliest-offset',
>        'key.format'='json',
>        'key.fields-prefix'='k_',
>        'key.fields'='k_id',
>        'value.format'='debezium-json',
>        'value.debezium-json.schema-include'='false',
>        'value.fields-include'='EXCEPT_KEY'
>    );
> 
>    SELECT * FROM todo_list;
> 
> Now, when I perform a query like this in the MS-SQL database:
> 
>    UPDATE todo_list SET name='test2' WHERE id=3;
> 
> Now I see that the Flink-SQL client updates the row with id=3 to have the new
> value "test2" for the 'name' field, as I was expecting. However, when I
> duplicate the 'done' field to have a different value, Flink-SQL seems to leave
> the old row with values (3, False, 'test2') intact, and shows a new row with
> values (3, True, 'test2').
> 
> I tried to append a `PRIMARY KEY (k_id) NOT ENFORCED` line between the first
> parentheses in the CREATE TABLE statement, but this seems to make no
> difference, except when running `DESCRIBE todo_list` in Flink-SQL.
> 
> I have no idea why the boolean field would cause different behavior than the
> text field. Am I missing some piece of configuration, are my expectations
> wrong?
> 
> 
> Regards,
> Joost Molenaar


Re: Flink-SQL returning duplicate rows for some records

Posted by Martijn Visser <ma...@apache.org>.
Hi Joost,

I'm looping in Leonard and Jark who might be able to help out here.

Best regards,

Martijn

On Mon, 2 May 2022 at 16:01, Joost Molenaar <j....@gmail.com> wrote:

> Hello all,
>
> I'm trying to use Flink-SQL to monitor a Kafka topic that's populated by
> Debezium, which is in turn monitoring a MS-SQL CDC table. For some reason,
> Flink-SQL shows a new row when I update the boolean field, but updates the
> row in place when I update the text field, and I'm not understanding why
> this happens. My ultimate goal is to use Flink-SQL to do a join on records
> that come from both sides of a 1:N relation in the foreign database, to
> expose a more ready to consume JSON object to downstream consumers.
>
> The source table is defined like this in MS-SQL:
>
>     CREATE TABLE todo_list (
>         id int IDENTITY NOT NULL,
>         done bit NOT NULL DEFAULT 0,
>         name varchar(MAX) NOT NULL,
>         CONSTRAINT PK_todo_list PRIMARY KEY (id)
>     );
>
> This is the configuration I'm sending to Debezium, note that I'm not
> including the
> JSON-schema in both keys and values:
>
>     {
>         "name": "todo-connector",
>         "config": {
>             "connector.class":
> "io.debezium.connector.sqlserver.SqlServerConnector",
>             "tasks.max": "1",
>             "database.server.name": "mssql",
>             "database.hostname": "10.88.10.1",
>             "database.port": "1433",
>             "database.user": "sa",
>             "database.password": "...",
>             "database.dbname": "todo",
>             "database.history.kafka.bootstrap.servers": "10.88.10.10:9092
> ",
>             "database.history.kafka.topic": "schema-changes.todo",
>             "key.converter": "org.apache.kafka.connect.json.JsonConverter",
>             "key.converter.schemas.enable": false,
>             "value.converter":
> "org.apache.kafka.connect.json.JsonConverter",
>             "value.converter.schemas.enable": false
>         }
>     }
>
> So Debezium is publishing events to Kafka with keys like this:
>
>     {"id":3}
>
> And values like this (whitespace added for readability), this is updating
> the
> value of the 'name' field:
>
>     {
>       "before": {
>         "id": 3,
>         "done": false,
>         "name": "test"
>       },
>       "after": {
>         "id": 3,
>         "done": false,
>         "name": "test2"
>       },
>       "source": {
>         "version": "1.9.0.Final",
>         "connector": "sqlserver",
>         "name": "mssql",
>         "ts_ms": 1651497653043,
>         "snapshot": "false",
>         "db": "todo",
>         "sequence": null,
>         "schema": "dbo",
>         "table": "todo_list",
>         "change_lsn": "00000025:00000d58:0002",
>         "commit_lsn": "00000025:00000d58:0003",
>         "event_serial_no": 2
>       },
>       "op": "u",
>       "ts_ms": 1651497654127,
>       "transaction": null
>     }
>
> (I verified this using a Python script that follows the relevant Kafka
> topic.)
>
> Next, I'm trying to follow this CDC stream in Flink by adding the
> Kafka connector
> for Flink SQL, defining a source table and starting a job in the Flink-SQL
> CLI:
>
>     ADD JAR '/opt/flink/opt/flink-sql-connector-kafka_2.11-1.14.4.jar';
>
>     CREATE TABLE todo_list (
>         k_id BIGINT,
>         done BOOLEAN,
>         name STRING
>     )
>     WITH (
>         'connector'='kafka',
>         'topic'='mssql.dbo.todo_list',
>         'properties.bootstrap.servers'='10.88.10.10:9092',
>         'properties.group.id'='flinksql-todo-list',
>         'scan.startup.mode'='earliest-offset',
>         'key.format'='json',
>         'key.fields-prefix'='k_',
>         'key.fields'='k_id',
>         'value.format'='debezium-json',
>         'value.debezium-json.schema-include'='false',
>         'value.fields-include'='EXCEPT_KEY'
>     );
>
>     SELECT * FROM todo_list;
>
> Now, when I perform a query like this in the MS-SQL database:
>
>     UPDATE todo_list SET name='test2' WHERE id=3;
>
> Now I see that the Flink-SQL client updates the row with id=3 to have the
> new
> value "test2" for the 'name' field, as I was expecting. However, when I
> duplicate the 'done' field to have a different value, Flink-SQL seems to
> leave
> the old row with values (3, False, 'test2') intact, and shows a new row
> with
> values (3, True, 'test2').
>
> I tried to append a `PRIMARY KEY (k_id) NOT ENFORCED` line between the
> first
> parentheses in the CREATE TABLE statement, but this seems to make no
> difference, except when running `DESCRIBE todo_list` in Flink-SQL.
>
> I have no idea why the boolean field would cause different behavior than
> the
> text field. Am I missing some piece of configuration, are my expectations
> wrong?
>
>
> Regards,
> Joost Molenaar
>