You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rex Fenley <Re...@remind101.com> on 2020/08/20 17:59:08 UTC

Debezium Flink EMR

Hi,

I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, however,
EMR only supports Flink 1.10.0, whereas Debezium Connector arrived in Flink
1.11.0, from looking at the documentation.

https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html

I'm wondering what alternative solutions are available for connecting
Debezium to Flink? Is there an open source Debezium connector that works
with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0
Debezium connector and compile it in my project using Flink 1.10.0 api?

For context, I plan on doing some fairly complicated long lived stateful
joins / materialization using the Table API over data ingested from
Postgres and possibly MySQL.

Appreciate any help, thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Debezium Flink EMR

Posted by Rex Fenley <Re...@remind101.com>.
This worked, thanks! Looking forward to the future releases :)

On Mon, Aug 31, 2020 at 5:06 PM Marta Paes Moreira <ma...@ververica.com>
wrote:

> Hey, Rex!
>
> This is likely due to the tombstone records that Debezium produces for
> DELETE operations (i.e. a record with the same key as the deleted row and a
> value of null). These are markers for Kafka to indicate that log
> compaction can remove all records for the given key, and the initial
> implementation of the debezium-format can't handle them. This issue is
> already documented (and solved) in [1].
>
> In the meantime, can you try adding "tombstones.on.delete":false" to the
> configuration of your Debezium MySQL connector? Marta
> [1] https://issues.apache.org/jira/browse/FLINK-18705
>
> On Tue, Sep 1, 2020 at 1:36 AM Rex Fenley <Re...@remind101.com> wrote:
>
>> Hi, getting so close but ran into another issue:
>>
>> Flink successfully reads changes from Debezium/Kafka and writes them to
>> Elasticsearch, but there's a problem with deletions. When I DELETE a row
>> from MySQL the deletion makes it successfully all the way to Elasticsearch
>> which is great, but then the taskmanager suddenly dies with a null pointer
>> exception. Inserts and Updates do not have the same problem. This seems
>> very odd. Any help would be much appreciated. Thanks!
>>
>> flink-taskmanager_1    | 2020-08-31 23:30:33,684 WARN
>>  org.apache.flink.runtime.taskmanager.Task                    [] - Source:
>> TableSourceScan(table=[[default_catalog, default_database,
>> topic_addresses]], fields=[id, customer_id, street, city, state, zip,
>> type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
>> fields=[id, customer_id, street, city, state, zip, type]) (1/2)
>> (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED.
>> flink-taskmanager_1    | java.lang.NullPointerException: null
>> flink-taskmanager_1    | at java.lang.String.<init>(String.java:566)
>> ~[?:1.8.0_265]
>> flink-taskmanager_1    | at
>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
>> ~[flink-json-1.11.1.jar:1.11.1]
>> flink-taskmanager_1    | at
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
>> flink-taskmanager_1    | at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
>> flink-taskmanager_1    | at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
>> flink-taskmanager_1    | at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
>> flink-taskmanager_1    | at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> flink-taskmanager_1    | at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> flink-taskmanager_1    | at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> flink-taskmanager_1    | 2020-08-31 23:30:33,720 INFO
>>  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing
>> task resources for Source: TableSourceScan(table=[[default_catalog,
>> default_database, topic_addresses]], fields=[id, customer_id, street, city,
>> state, zip, type]) -> Sink:
>> Sink(table=[default_catalog.default_database.ESAddresses], fields=[id,
>> customer_id, street, city, state, zip, type]) (1/2)
>> (2b79917cb528f37fad7f636740d2fdd8).
>> flink-taskmanager_1    | 2020-08-31 23:30:33,728 INFO
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] -
>> Un-registering task and sending final execution state FAILED to JobManager
>> for task Source: TableSourceScan(table=[[default_catalog, default_database,
>> topic_addresses]], fields=[id, customer_id, street, city, state, zip,
>> type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
>> fields=[id, customer_id, street, city, state, zip, type]) (1/2)
>> 2b79917cb528f37fad7f636740d2fdd8.
>> flink-jobmanager_1     | 2020-08-31 23:30:33,770 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
>> TableSourceScan(table=[[default_catalog, default_database,
>> topic_addresses]], fields=[id, customer_id, street, city, state, zip,
>> type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
>> fields=[id, customer_id, street, city, state, zip, type]) (1/2)
>> (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED on
>> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2e246b35.
>> flink-jobmanager_1     | java.lang.NullPointerException: null
>> flink-jobmanager_1     | at java.lang.String.<init>(String.java:566)
>> ~[?:1.8.0_265]
>> flink-jobmanager_1     | at
>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
>> ~[flink-json-1.11.1.jar:1.11.1]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>> ~[?:?]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>> ~[?:?]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>> ~[?:?]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>> ~[?:?]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>
>> On Mon, Aug 31, 2020 at 12:27 PM Rex Fenley <Re...@remind101.com> wrote:
>>
>>> Ah, my bad, thanks for pointing that out Arvid!
>>>
>>> On Mon, Aug 31, 2020 at 12:00 PM Arvid Heise <ar...@ververica.com>
>>> wrote:
>>>
>>>> Hi Rex,
>>>>
>>>> you still forgot
>>>>
>>>> 'debezium-json.schema-include' = true
>>>>
>>>> Please reread my mail.
>>>>
>>>>
>>>> On Mon, Aug 31, 2020 at 7:55 PM Rex Fenley <Re...@remind101.com> wrote:
>>>>
>>>>> Thanks for the input, though I've certainly included a schema as is
>>>>> reflected earlier in this thread. Including here again
>>>>> ...
>>>>> tableEnv.executeSql("""
>>>>> CREATE TABLE topic_addresses (
>>>>> -- schema is totally the same to the MySQL "addresses" table
>>>>> id INT,
>>>>> customer_id INT,
>>>>> street STRING,
>>>>> city STRING,
>>>>> state STRING,
>>>>> zip STRING,
>>>>> type STRING,
>>>>> PRIMARY KEY (id) NOT ENFORCED
>>>>> ) WITH (
>>>>> 'connector' = 'kafka',
>>>>> 'topic' = 'dbserver1.inventory.addresses',
>>>>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
>>>>> 'properties.group.id' = 'testGroup',
>>>>> 'format' = 'debezium-json' -- using debezium-json as the format
>>>>> )
>>>>> """)
>>>>>
>>>>> val table = tableEnv.from("topic_addresses").select($"*")
>>>>> ...
>>>>>
>>>>> On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise <ar...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Rex,
>>>>>>
>>>>>> the connector expects a value without a schema, but the message
>>>>>> contains a schema. You can tell Flink that the schema is included as
>>>>>> written in the documentation [1].
>>>>>>
>>>>>> CREATE TABLE topic_products (
>>>>>>   -- schema is totally the same to the MySQL "products" table
>>>>>>   id BIGINT,
>>>>>>   name STRING,
>>>>>>   description STRING,
>>>>>>   weight DECIMAL(10, 2)) WITH (
>>>>>>  'connector' = 'kafka',
>>>>>>  'topic' = 'products_binlog',
>>>>>>  'properties.bootstrap.servers' = 'localhost:9092',
>>>>>>  'properties.group.id' = 'testGroup',
>>>>>>  'format' = 'debezium-json',
>>>>>>  'debezium-json.schema-include' = true)
>>>>>>
>>>>>> @Jark Wu <im...@gmail.com> , it would be probably good to make the
>>>>>> connector more robust and catch these types of misconfigurations.
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#how-to-use-debezium-format
>>>>>>
>>>>>> On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley <Re...@remind101.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Awesome, so that took me a step further. When running i'm receiving
>>>>>>> an error however. FYI, my docker-compose file is based on the Debezium
>>>>>>> mysql tutorial which can be found here
>>>>>>> https://debezium.io/documentation/reference/1.2/tutorial.html
>>>>>>>
>>>>>>> Part of the stack trace:
>>>>>>>
>>>>>>> flink-jobmanager_1     | Caused by: java.io.IOException: Corrupt
>>>>>>> Debezium JSON message
>>>>>>> '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111
>>>>>>> cool street","city":"Big
>>>>>>> City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'.
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
>>>>>>> ~[flink-json-1.11.1.jar:1.11.1]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>>>>>>> ~[?:?]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>>>>>>> ~[?:?]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>>>>>>> ~[?:?]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>>>>>> ~[?:?]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>>> flink-jobmanager_1     | Caused by: java.lang.NullPointerException
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115)
>>>>>>> ~[flink-json-1.11.1.jar:1.11.1]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>>>>>>> ~[?:?]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>>>>>>> ~[?:?]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>>>>>>> ~[?:?]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>>>>>> ~[?:?]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>>>
>>>>>>> On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <im...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> This is a known issue in 1.11.0, and has been fixed in 1.11.1.
>>>>>>>>
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jark
>>>>>>>>
>>>>>>>> On Fri, 28 Aug 2020 at 06:52, Rex Fenley <Re...@remind101.com> wrote:
>>>>>>>>
>>>>>>>>> Hi again!
>>>>>>>>>
>>>>>>>>> I'm tested out locally in docker on Flink 1.11 first to get my
>>>>>>>>> bearings before downgrading to 1.10 and figuring out how to replace the
>>>>>>>>> Debezium connector. However, I'm getting the following error
>>>>>>>>> ```
>>>>>>>>> Provided trait [BEFORE_AND_AFTER] can't satisfy required trait
>>>>>>>>> [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> Any suggestions for me to fix this?
>>>>>>>>>
>>>>>>>>> code:
>>>>>>>>>
>>>>>>>>> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>> val blinkStreamSettings =
>>>>>>>>> EnvironmentSettings
>>>>>>>>> .newInstance()
>>>>>>>>> .useBlinkPlanner()
>>>>>>>>> .inStreamingMode()
>>>>>>>>> .build()
>>>>>>>>> val tableEnv = StreamTableEnvironment.create(bsEnv,
>>>>>>>>> blinkStreamSettings)
>>>>>>>>>
>>>>>>>>> // Table from Debezium mysql example docker:
>>>>>>>>> //
>>>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>>>>>> // | Field | Type | Null | Key | Default | Extra |
>>>>>>>>> //
>>>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>>>>>> // | id | int(11) | NO | PRI | NULL | auto_increment |
>>>>>>>>> // | customer_id | int(11) | NO | MUL | NULL | |
>>>>>>>>> // | street | varchar(255) | NO | | NULL | |
>>>>>>>>> // | city | varchar(255) | NO | | NULL | |
>>>>>>>>> // | state | varchar(255) | NO | | NULL | |
>>>>>>>>> // | zip | varchar(255) | NO | | NULL | |
>>>>>>>>> // | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
>>>>>>>>> //
>>>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>>>>>>
>>>>>>>>> tableEnv.executeSql("""
>>>>>>>>> CREATE TABLE topic_addresses (
>>>>>>>>> -- schema is totally the same to the MySQL "addresses" table
>>>>>>>>> id INT,
>>>>>>>>> customer_id INT,
>>>>>>>>> street STRING,
>>>>>>>>> city STRING,
>>>>>>>>> state STRING,
>>>>>>>>> zip STRING,
>>>>>>>>> type STRING,
>>>>>>>>> PRIMARY KEY (id) NOT ENFORCED
>>>>>>>>> ) WITH (
>>>>>>>>> 'connector' = 'kafka',
>>>>>>>>> 'topic' = 'dbserver1.inventory.addresses',
>>>>>>>>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
>>>>>>>>> 'properties.group.id' = 'testGroup',
>>>>>>>>> 'format' = 'debezium-json' -- using debezium-json as the format
>>>>>>>>> )
>>>>>>>>> """)
>>>>>>>>>
>>>>>>>>> val table = tableEnv.from("topic_addresses").select($"*")
>>>>>>>>>
>>>>>>>>> // Defining a PK automatically puts it in Upsert mode, which we
>>>>>>>>> want.
>>>>>>>>> // TODO: type should be a keyword, is that acceptable by the DDL?
>>>>>>>>> tableEnv.executeSql("""
>>>>>>>>> CREATE TABLE ESAddresses (
>>>>>>>>> id INT,
>>>>>>>>> customer_id INT,
>>>>>>>>> street STRING,
>>>>>>>>> city STRING,
>>>>>>>>> state STRING,
>>>>>>>>> zip STRING,
>>>>>>>>> type STRING,
>>>>>>>>> PRIMARY KEY (id) NOT ENFORCED
>>>>>>>>> ) WITH (
>>>>>>>>> 'connector' = 'elasticsearch-7',
>>>>>>>>> 'hosts' = 'http://flink-jdbc-test_graph-elasticsearch_1:9200',
>>>>>>>>> 'index' = 'flinkaddresses',
>>>>>>>>> 'format' = 'json'
>>>>>>>>> )
>>>>>>>>> """)
>>>>>>>>>
>>>>>>>>> table.executeInsert("ESAddresses").print()
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>> On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <Re...@remind101.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks!
>>>>>>>>>>
>>>>>>>>>> On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <im...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Regarding the performance difference, the proposed way will have
>>>>>>>>>>> one more stateful operator (deduplication) than the native 1.11 cdc
>>>>>>>>>>> support.
>>>>>>>>>>> The overhead of the deduplication operator is just similar to a
>>>>>>>>>>> simple group by aggregate (max on each non-key column).
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Jark
>>>>>>>>>>>
>>>>>>>>>>> On Tue, 25 Aug 2020 at 02:21, Rex Fenley <Re...@remind101.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thank you so much for the help!
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <
>>>>>>>>>>>> marta@ververica.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Yes — you'll get the full row in the payload; and you can also
>>>>>>>>>>>>> access the change operation, which might be useful in your case.
>>>>>>>>>>>>>
>>>>>>>>>>>>> About performance, I'm summoning Kurt and @Jark Wu
>>>>>>>>>>>>> <ja...@apache.org> to the thread, who will be able to give you
>>>>>>>>>>>>> a more complete answer and likely also some optimization tips for your
>>>>>>>>>>>>> specific use case.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Marta
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <Re...@remind101.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Yup! This definitely helps and makes sense.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The 'after' payload comes with all data from the row right?
>>>>>>>>>>>>>> So essentially inserts and updates I can insert/replace data by pk and null
>>>>>>>>>>>>>> values I just delete by pk, and then I can build out the rest of my joins
>>>>>>>>>>>>>> like normal.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Are there any performance implications of doing it this way
>>>>>>>>>>>>>> that is different from the out-of-the-box 1.11 solution?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <
>>>>>>>>>>>>>> marta@ververica.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi, Rex.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Part of what enabled CDC support in Flink 1.11 was the
>>>>>>>>>>>>>>> refactoring of the table source interfaces (FLIP-95 [1]), and the new
>>>>>>>>>>>>>>> ScanTableSource [2], which allows to emit bounded/unbounded streams with
>>>>>>>>>>>>>>> insert, update and delete rows.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In theory, you could consume data generated with Debezium as
>>>>>>>>>>>>>>> regular JSON-encoded events before Flink 1.11 — there just wasn't a
>>>>>>>>>>>>>>> convenient way to really treat it as "changelog". As a workaround, what you
>>>>>>>>>>>>>>> can do in Flink 1.10 is process these messages as JSON and extract the
>>>>>>>>>>>>>>> "after" field from the payload, and then apply de-duplication [3] to keep
>>>>>>>>>>>>>>> only the last row.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The DDL for your source table would look something like:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> CREATE TABLE tablename ( *... * after ROW(`field1`
>>>>>>>>>>>>>>> DATATYPE, `field2` DATATYPE, ...) ) WITH ( 'connector' =
>>>>>>>>>>>>>>> 'kafka', 'format' = 'json', ... );
>>>>>>>>>>>>>>> Hope this helps!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Marta
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
>>>>>>>>>>>>>>> [3]
>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <
>>>>>>>>>>>>>>> chesnay@apache.org> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> @Jark Would it be possible to use the 1.11 debezium support
>>>>>>>>>>>>>>>> in 1.10?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On 20/08/2020 19:59, Rex Fenley wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I'm trying to set up Flink with Debezium CDC Connector on
>>>>>>>>>>>>>>>> AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium
>>>>>>>>>>>>>>>> Connector arrived in Flink 1.11.0, from looking at the documentation.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I'm wondering what alternative solutions are available for
>>>>>>>>>>>>>>>> connecting Debezium to Flink? Is there an open source Debezium connector
>>>>>>>>>>>>>>>> that works with Flink 1.10.0? Could I potentially pull the code out for the
>>>>>>>>>>>>>>>> 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0
>>>>>>>>>>>>>>>> api?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For context, I plan on doing some fairly complicated long
>>>>>>>>>>>>>>>> lived stateful joins / materialization using the Table API over data
>>>>>>>>>>>>>>>> ingested from Postgres and possibly MySQL.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Appreciate any help, thanks!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>>
>>>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>
>>>>>>>
>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Arvid Heise | Senior Java Developer
>>>>>>
>>>>>> <https://www.ververica.com/>
>>>>>>
>>>>>> Follow us @VervericaData
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>>> Conference
>>>>>>
>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>>
>>>>>> --
>>>>>> Ververica GmbH
>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>>>>> Ji (Toni) Cheng
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Arvid Heise | Senior Java Developer
>>>>
>>>> <https://www.ververica.com/>
>>>>
>>>> Follow us @VervericaData
>>>>
>>>> --
>>>>
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>> Conference
>>>>
>>>> Stream Processing | Event Driven | Real Time
>>>>
>>>> --
>>>>
>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>
>>>> --
>>>> Ververica GmbH
>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>>> (Toni) Cheng
>>>>
>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Debezium Flink EMR

Posted by Marta Paes Moreira <ma...@ververica.com>.
Hey, Rex!

This is likely due to the tombstone records that Debezium produces for
DELETE operations (i.e. a record with the same key as the deleted row and a
value of null). These are markers for Kafka to indicate that log compaction
can remove all records for the given key, and the initial implementation of
the debezium-format can't handle them. This issue is already documented
(and solved) in [1].

In the meantime, can you try adding "tombstones.on.delete":false" to the
configuration of your Debezium MySQL connector? Marta
[1] https://issues.apache.org/jira/browse/FLINK-18705

On Tue, Sep 1, 2020 at 1:36 AM Rex Fenley <Re...@remind101.com> wrote:

> Hi, getting so close but ran into another issue:
>
> Flink successfully reads changes from Debezium/Kafka and writes them to
> Elasticsearch, but there's a problem with deletions. When I DELETE a row
> from MySQL the deletion makes it successfully all the way to Elasticsearch
> which is great, but then the taskmanager suddenly dies with a null pointer
> exception. Inserts and Updates do not have the same problem. This seems
> very odd. Any help would be much appreciated. Thanks!
>
> flink-taskmanager_1    | 2020-08-31 23:30:33,684 WARN
>  org.apache.flink.runtime.taskmanager.Task                    [] - Source:
> TableSourceScan(table=[[default_catalog, default_database,
> topic_addresses]], fields=[id, customer_id, street, city, state, zip,
> type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
> fields=[id, customer_id, street, city, state, zip, type]) (1/2)
> (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED.
> flink-taskmanager_1    | java.lang.NullPointerException: null
> flink-taskmanager_1    | at java.lang.String.<init>(String.java:566)
> ~[?:1.8.0_265]
> flink-taskmanager_1    | at
> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
> ~[flink-json-1.11.1.jar:1.11.1]
> flink-taskmanager_1    | at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
> flink-taskmanager_1    | at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
> flink-taskmanager_1    | at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
> flink-taskmanager_1    | at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
> flink-taskmanager_1    | at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> flink-taskmanager_1    | at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> flink-taskmanager_1    | at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> flink-taskmanager_1    | 2020-08-31 23:30:33,720 INFO
>  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing
> task resources for Source: TableSourceScan(table=[[default_catalog,
> default_database, topic_addresses]], fields=[id, customer_id, street, city,
> state, zip, type]) -> Sink:
> Sink(table=[default_catalog.default_database.ESAddresses], fields=[id,
> customer_id, street, city, state, zip, type]) (1/2)
> (2b79917cb528f37fad7f636740d2fdd8).
> flink-taskmanager_1    | 2020-08-31 23:30:33,728 INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] -
> Un-registering task and sending final execution state FAILED to JobManager
> for task Source: TableSourceScan(table=[[default_catalog, default_database,
> topic_addresses]], fields=[id, customer_id, street, city, state, zip,
> type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
> fields=[id, customer_id, street, city, state, zip, type]) (1/2)
> 2b79917cb528f37fad7f636740d2fdd8.
> flink-jobmanager_1     | 2020-08-31 23:30:33,770 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
> TableSourceScan(table=[[default_catalog, default_database,
> topic_addresses]], fields=[id, customer_id, street, city, state, zip,
> type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
> fields=[id, customer_id, street, city, state, zip, type]) (1/2)
> (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED on
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2e246b35.
> flink-jobmanager_1     | java.lang.NullPointerException: null
> flink-jobmanager_1     | at java.lang.String.<init>(String.java:566)
> ~[?:1.8.0_265]
> flink-jobmanager_1     | at
> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
> ~[flink-json-1.11.1.jar:1.11.1]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
> ~[?:?]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> ~[?:?]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> ~[?:?]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> ~[?:?]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>
> On Mon, Aug 31, 2020 at 12:27 PM Rex Fenley <Re...@remind101.com> wrote:
>
>> Ah, my bad, thanks for pointing that out Arvid!
>>
>> On Mon, Aug 31, 2020 at 12:00 PM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Rex,
>>>
>>> you still forgot
>>>
>>> 'debezium-json.schema-include' = true
>>>
>>> Please reread my mail.
>>>
>>>
>>> On Mon, Aug 31, 2020 at 7:55 PM Rex Fenley <Re...@remind101.com> wrote:
>>>
>>>> Thanks for the input, though I've certainly included a schema as is
>>>> reflected earlier in this thread. Including here again
>>>> ...
>>>> tableEnv.executeSql("""
>>>> CREATE TABLE topic_addresses (
>>>> -- schema is totally the same to the MySQL "addresses" table
>>>> id INT,
>>>> customer_id INT,
>>>> street STRING,
>>>> city STRING,
>>>> state STRING,
>>>> zip STRING,
>>>> type STRING,
>>>> PRIMARY KEY (id) NOT ENFORCED
>>>> ) WITH (
>>>> 'connector' = 'kafka',
>>>> 'topic' = 'dbserver1.inventory.addresses',
>>>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
>>>> 'properties.group.id' = 'testGroup',
>>>> 'format' = 'debezium-json' -- using debezium-json as the format
>>>> )
>>>> """)
>>>>
>>>> val table = tableEnv.from("topic_addresses").select($"*")
>>>> ...
>>>>
>>>> On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise <ar...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi Rex,
>>>>>
>>>>> the connector expects a value without a schema, but the message
>>>>> contains a schema. You can tell Flink that the schema is included as
>>>>> written in the documentation [1].
>>>>>
>>>>> CREATE TABLE topic_products (
>>>>>   -- schema is totally the same to the MySQL "products" table
>>>>>   id BIGINT,
>>>>>   name STRING,
>>>>>   description STRING,
>>>>>   weight DECIMAL(10, 2)) WITH (
>>>>>  'connector' = 'kafka',
>>>>>  'topic' = 'products_binlog',
>>>>>  'properties.bootstrap.servers' = 'localhost:9092',
>>>>>  'properties.group.id' = 'testGroup',
>>>>>  'format' = 'debezium-json',
>>>>>  'debezium-json.schema-include' = true)
>>>>>
>>>>> @Jark Wu <im...@gmail.com> , it would be probably good to make the
>>>>> connector more robust and catch these types of misconfigurations.
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#how-to-use-debezium-format
>>>>>
>>>>> On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley <Re...@remind101.com> wrote:
>>>>>
>>>>>> Awesome, so that took me a step further. When running i'm receiving
>>>>>> an error however. FYI, my docker-compose file is based on the Debezium
>>>>>> mysql tutorial which can be found here
>>>>>> https://debezium.io/documentation/reference/1.2/tutorial.html
>>>>>>
>>>>>> Part of the stack trace:
>>>>>>
>>>>>> flink-jobmanager_1     | Caused by: java.io.IOException: Corrupt
>>>>>> Debezium JSON message
>>>>>> '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111
>>>>>> cool street","city":"Big
>>>>>> City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'.
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
>>>>>> ~[flink-json-1.11.1.jar:1.11.1]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>>>>>> ~[?:?]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>>>>>> ~[?:?]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>>>>>> ~[?:?]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>>>>> ~[?:?]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>> flink-jobmanager_1     | Caused by: java.lang.NullPointerException
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115)
>>>>>> ~[flink-json-1.11.1.jar:1.11.1]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>>>>>> ~[?:?]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>>>>>> ~[?:?]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>>>>>> ~[?:?]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>>>>> ~[?:?]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>>
>>>>>> On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <im...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> This is a known issue in 1.11.0, and has been fixed in 1.11.1.
>>>>>>>
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>> On Fri, 28 Aug 2020 at 06:52, Rex Fenley <Re...@remind101.com> wrote:
>>>>>>>
>>>>>>>> Hi again!
>>>>>>>>
>>>>>>>> I'm tested out locally in docker on Flink 1.11 first to get my
>>>>>>>> bearings before downgrading to 1.10 and figuring out how to replace the
>>>>>>>> Debezium connector. However, I'm getting the following error
>>>>>>>> ```
>>>>>>>> Provided trait [BEFORE_AND_AFTER] can't satisfy required trait
>>>>>>>> [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
>>>>>>>> ```
>>>>>>>>
>>>>>>>> Any suggestions for me to fix this?
>>>>>>>>
>>>>>>>> code:
>>>>>>>>
>>>>>>>> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>> val blinkStreamSettings =
>>>>>>>> EnvironmentSettings
>>>>>>>> .newInstance()
>>>>>>>> .useBlinkPlanner()
>>>>>>>> .inStreamingMode()
>>>>>>>> .build()
>>>>>>>> val tableEnv = StreamTableEnvironment.create(bsEnv,
>>>>>>>> blinkStreamSettings)
>>>>>>>>
>>>>>>>> // Table from Debezium mysql example docker:
>>>>>>>> //
>>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>>>>> // | Field | Type | Null | Key | Default | Extra |
>>>>>>>> //
>>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>>>>> // | id | int(11) | NO | PRI | NULL | auto_increment |
>>>>>>>> // | customer_id | int(11) | NO | MUL | NULL | |
>>>>>>>> // | street | varchar(255) | NO | | NULL | |
>>>>>>>> // | city | varchar(255) | NO | | NULL | |
>>>>>>>> // | state | varchar(255) | NO | | NULL | |
>>>>>>>> // | zip | varchar(255) | NO | | NULL | |
>>>>>>>> // | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
>>>>>>>> //
>>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>>>>>
>>>>>>>> tableEnv.executeSql("""
>>>>>>>> CREATE TABLE topic_addresses (
>>>>>>>> -- schema is totally the same to the MySQL "addresses" table
>>>>>>>> id INT,
>>>>>>>> customer_id INT,
>>>>>>>> street STRING,
>>>>>>>> city STRING,
>>>>>>>> state STRING,
>>>>>>>> zip STRING,
>>>>>>>> type STRING,
>>>>>>>> PRIMARY KEY (id) NOT ENFORCED
>>>>>>>> ) WITH (
>>>>>>>> 'connector' = 'kafka',
>>>>>>>> 'topic' = 'dbserver1.inventory.addresses',
>>>>>>>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
>>>>>>>> 'properties.group.id' = 'testGroup',
>>>>>>>> 'format' = 'debezium-json' -- using debezium-json as the format
>>>>>>>> )
>>>>>>>> """)
>>>>>>>>
>>>>>>>> val table = tableEnv.from("topic_addresses").select($"*")
>>>>>>>>
>>>>>>>> // Defining a PK automatically puts it in Upsert mode, which we
>>>>>>>> want.
>>>>>>>> // TODO: type should be a keyword, is that acceptable by the DDL?
>>>>>>>> tableEnv.executeSql("""
>>>>>>>> CREATE TABLE ESAddresses (
>>>>>>>> id INT,
>>>>>>>> customer_id INT,
>>>>>>>> street STRING,
>>>>>>>> city STRING,
>>>>>>>> state STRING,
>>>>>>>> zip STRING,
>>>>>>>> type STRING,
>>>>>>>> PRIMARY KEY (id) NOT ENFORCED
>>>>>>>> ) WITH (
>>>>>>>> 'connector' = 'elasticsearch-7',
>>>>>>>> 'hosts' = 'http://flink-jdbc-test_graph-elasticsearch_1:9200',
>>>>>>>> 'index' = 'flinkaddresses',
>>>>>>>> 'format' = 'json'
>>>>>>>> )
>>>>>>>> """)
>>>>>>>>
>>>>>>>> table.executeInsert("ESAddresses").print()
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>> On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <Re...@remind101.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>> On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <im...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Regarding the performance difference, the proposed way will have
>>>>>>>>>> one more stateful operator (deduplication) than the native 1.11 cdc
>>>>>>>>>> support.
>>>>>>>>>> The overhead of the deduplication operator is just similar to a
>>>>>>>>>> simple group by aggregate (max on each non-key column).
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Jark
>>>>>>>>>>
>>>>>>>>>> On Tue, 25 Aug 2020 at 02:21, Rex Fenley <Re...@remind101.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thank you so much for the help!
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <
>>>>>>>>>>> marta@ververica.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Yes — you'll get the full row in the payload; and you can also
>>>>>>>>>>>> access the change operation, which might be useful in your case.
>>>>>>>>>>>>
>>>>>>>>>>>> About performance, I'm summoning Kurt and @Jark Wu
>>>>>>>>>>>> <ja...@apache.org> to the thread, who will be able to give you
>>>>>>>>>>>> a more complete answer and likely also some optimization tips for your
>>>>>>>>>>>> specific use case.
>>>>>>>>>>>>
>>>>>>>>>>>> Marta
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <Re...@remind101.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Yup! This definitely helps and makes sense.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The 'after' payload comes with all data from the row right? So
>>>>>>>>>>>>> essentially inserts and updates I can insert/replace data by pk and null
>>>>>>>>>>>>> values I just delete by pk, and then I can build out the rest of my joins
>>>>>>>>>>>>> like normal.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Are there any performance implications of doing it this way
>>>>>>>>>>>>> that is different from the out-of-the-box 1.11 solution?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <
>>>>>>>>>>>>> marta@ververica.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi, Rex.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Part of what enabled CDC support in Flink 1.11 was the
>>>>>>>>>>>>>> refactoring of the table source interfaces (FLIP-95 [1]), and the new
>>>>>>>>>>>>>> ScanTableSource [2], which allows to emit bounded/unbounded streams with
>>>>>>>>>>>>>> insert, update and delete rows.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In theory, you could consume data generated with Debezium as
>>>>>>>>>>>>>> regular JSON-encoded events before Flink 1.11 — there just wasn't a
>>>>>>>>>>>>>> convenient way to really treat it as "changelog". As a workaround, what you
>>>>>>>>>>>>>> can do in Flink 1.10 is process these messages as JSON and extract the
>>>>>>>>>>>>>> "after" field from the payload, and then apply de-duplication [3] to keep
>>>>>>>>>>>>>> only the last row.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The DDL for your source table would look something like:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE,
>>>>>>>>>>>>>> `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka',
>>>>>>>>>>>>>> 'format' = 'json', ... );
>>>>>>>>>>>>>> Hope this helps!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Marta
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
>>>>>>>>>>>>>> [3]
>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <
>>>>>>>>>>>>>> chesnay@apache.org> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> @Jark Would it be possible to use the 1.11 debezium support
>>>>>>>>>>>>>>> in 1.10?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 20/08/2020 19:59, Rex Fenley wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I'm trying to set up Flink with Debezium CDC Connector on
>>>>>>>>>>>>>>> AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium
>>>>>>>>>>>>>>> Connector arrived in Flink 1.11.0, from looking at the documentation.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I'm wondering what alternative solutions are available for
>>>>>>>>>>>>>>> connecting Debezium to Flink? Is there an open source Debezium connector
>>>>>>>>>>>>>>> that works with Flink 1.10.0? Could I potentially pull the code out for the
>>>>>>>>>>>>>>> 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0
>>>>>>>>>>>>>>> api?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For context, I plan on doing some fairly complicated long
>>>>>>>>>>>>>>> lived stateful joins / materialization using the Table API over data
>>>>>>>>>>>>>>> ingested from Postgres and possibly MySQL.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Appreciate any help, thanks!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>>
>>>>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>>
>>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>
>>>>>>>>
>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>
>>>>>>
>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>> <https://www.facebook.com/remindhq>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Arvid Heise | Senior Java Developer
>>>>>
>>>>> <https://www.ververica.com/>
>>>>>
>>>>> Follow us @VervericaData
>>>>>
>>>>> --
>>>>>
>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>> Conference
>>>>>
>>>>> Stream Processing | Event Driven | Real Time
>>>>>
>>>>> --
>>>>>
>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>
>>>>> --
>>>>> Ververica GmbH
>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>>>> Ji (Toni) Cheng
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>

Re: Debezium Flink EMR

Posted by Rex Fenley <Re...@remind101.com>.
Hi, getting so close but ran into another issue:

Flink successfully reads changes from Debezium/Kafka and writes them to
Elasticsearch, but there's a problem with deletions. When I DELETE a row
from MySQL the deletion makes it successfully all the way to Elasticsearch
which is great, but then the taskmanager suddenly dies with a null pointer
exception. Inserts and Updates do not have the same problem. This seems
very odd. Any help would be much appreciated. Thanks!

flink-taskmanager_1    | 2020-08-31 23:30:33,684 WARN
 org.apache.flink.runtime.taskmanager.Task                    [] - Source:
TableSourceScan(table=[[default_catalog, default_database,
topic_addresses]], fields=[id, customer_id, street, city, state, zip,
type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
fields=[id, customer_id, street, city, state, zip, type]) (1/2)
(2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED.
flink-taskmanager_1    | java.lang.NullPointerException: null
flink-taskmanager_1    | at java.lang.String.<init>(String.java:566)
~[?:1.8.0_265]
flink-taskmanager_1    | at
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
~[flink-json-1.11.1.jar:1.11.1]
flink-taskmanager_1    | at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1    | at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1    | at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1    | at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1    | at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-taskmanager_1    | at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-taskmanager_1    | at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-taskmanager_1    | 2020-08-31 23:30:33,720 INFO
 org.apache.flink.runtime.taskmanager.Task                    [] - Freeing
task resources for Source: TableSourceScan(table=[[default_catalog,
default_database, topic_addresses]], fields=[id, customer_id, street, city,
state, zip, type]) -> Sink:
Sink(table=[default_catalog.default_database.ESAddresses], fields=[id,
customer_id, street, city, state, zip, type]) (1/2)
(2b79917cb528f37fad7f636740d2fdd8).
flink-taskmanager_1    | 2020-08-31 23:30:33,728 INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor           [] -
Un-registering task and sending final execution state FAILED to JobManager
for task Source: TableSourceScan(table=[[default_catalog, default_database,
topic_addresses]], fields=[id, customer_id, street, city, state, zip,
type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
fields=[id, customer_id, street, city, state, zip, type]) (1/2)
2b79917cb528f37fad7f636740d2fdd8.
flink-jobmanager_1     | 2020-08-31 23:30:33,770 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
TableSourceScan(table=[[default_catalog, default_database,
topic_addresses]], fields=[id, customer_id, street, city, state, zip,
type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
fields=[id, customer_id, street, city, state, zip, type]) (1/2)
(2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED on
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2e246b35.
flink-jobmanager_1     | java.lang.NullPointerException: null
flink-jobmanager_1     | at java.lang.String.<init>(String.java:566)
~[?:1.8.0_265]
flink-jobmanager_1     | at
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
~[?:?]
flink-jobmanager_1     | at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
~[?:?]
flink-jobmanager_1     | at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
~[?:?]
flink-jobmanager_1     | at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
~[?:?]
flink-jobmanager_1     | at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
~[flink-dist_2.12-1.11.1.jar:1.11.1]

On Mon, Aug 31, 2020 at 12:27 PM Rex Fenley <Re...@remind101.com> wrote:

> Ah, my bad, thanks for pointing that out Arvid!
>
> On Mon, Aug 31, 2020 at 12:00 PM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Rex,
>>
>> you still forgot
>>
>> 'debezium-json.schema-include' = true
>>
>> Please reread my mail.
>>
>>
>> On Mon, Aug 31, 2020 at 7:55 PM Rex Fenley <Re...@remind101.com> wrote:
>>
>>> Thanks for the input, though I've certainly included a schema as is
>>> reflected earlier in this thread. Including here again
>>> ...
>>> tableEnv.executeSql("""
>>> CREATE TABLE topic_addresses (
>>> -- schema is totally the same to the MySQL "addresses" table
>>> id INT,
>>> customer_id INT,
>>> street STRING,
>>> city STRING,
>>> state STRING,
>>> zip STRING,
>>> type STRING,
>>> PRIMARY KEY (id) NOT ENFORCED
>>> ) WITH (
>>> 'connector' = 'kafka',
>>> 'topic' = 'dbserver1.inventory.addresses',
>>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
>>> 'properties.group.id' = 'testGroup',
>>> 'format' = 'debezium-json' -- using debezium-json as the format
>>> )
>>> """)
>>>
>>> val table = tableEnv.from("topic_addresses").select($"*")
>>> ...
>>>
>>> On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise <ar...@ververica.com> wrote:
>>>
>>>> Hi Rex,
>>>>
>>>> the connector expects a value without a schema, but the message
>>>> contains a schema. You can tell Flink that the schema is included as
>>>> written in the documentation [1].
>>>>
>>>> CREATE TABLE topic_products (
>>>>   -- schema is totally the same to the MySQL "products" table
>>>>   id BIGINT,
>>>>   name STRING,
>>>>   description STRING,
>>>>   weight DECIMAL(10, 2)) WITH (
>>>>  'connector' = 'kafka',
>>>>  'topic' = 'products_binlog',
>>>>  'properties.bootstrap.servers' = 'localhost:9092',
>>>>  'properties.group.id' = 'testGroup',
>>>>  'format' = 'debezium-json',
>>>>  'debezium-json.schema-include' = true)
>>>>
>>>> @Jark Wu <im...@gmail.com> , it would be probably good to make the
>>>> connector more robust and catch these types of misconfigurations.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#how-to-use-debezium-format
>>>>
>>>> On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley <Re...@remind101.com> wrote:
>>>>
>>>>> Awesome, so that took me a step further. When running i'm receiving an
>>>>> error however. FYI, my docker-compose file is based on the Debezium mysql
>>>>> tutorial which can be found here
>>>>> https://debezium.io/documentation/reference/1.2/tutorial.html
>>>>>
>>>>> Part of the stack trace:
>>>>>
>>>>> flink-jobmanager_1     | Caused by: java.io.IOException: Corrupt
>>>>> Debezium JSON message
>>>>> '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111
>>>>> cool street","city":"Big
>>>>> City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'.
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
>>>>> ~[flink-json-1.11.1.jar:1.11.1]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>>>>> ~[?:?]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>>>>> ~[?:?]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>>>>> ~[?:?]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>>>> ~[?:?]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>> flink-jobmanager_1     | Caused by: java.lang.NullPointerException
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115)
>>>>> ~[flink-json-1.11.1.jar:1.11.1]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>>>>> ~[?:?]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>>>>> ~[?:?]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>>>>> ~[?:?]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>>>> ~[?:?]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>
>>>>> On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <im...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> This is a known issue in 1.11.0, and has been fixed in 1.11.1.
>>>>>>
>>>>>>
>>>>>> Best,
>>>>>> Jark
>>>>>>
>>>>>> On Fri, 28 Aug 2020 at 06:52, Rex Fenley <Re...@remind101.com> wrote:
>>>>>>
>>>>>>> Hi again!
>>>>>>>
>>>>>>> I'm tested out locally in docker on Flink 1.11 first to get my
>>>>>>> bearings before downgrading to 1.10 and figuring out how to replace the
>>>>>>> Debezium connector. However, I'm getting the following error
>>>>>>> ```
>>>>>>> Provided trait [BEFORE_AND_AFTER] can't satisfy required trait
>>>>>>> [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
>>>>>>> ```
>>>>>>>
>>>>>>> Any suggestions for me to fix this?
>>>>>>>
>>>>>>> code:
>>>>>>>
>>>>>>> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>> val blinkStreamSettings =
>>>>>>> EnvironmentSettings
>>>>>>> .newInstance()
>>>>>>> .useBlinkPlanner()
>>>>>>> .inStreamingMode()
>>>>>>> .build()
>>>>>>> val tableEnv = StreamTableEnvironment.create(bsEnv,
>>>>>>> blinkStreamSettings)
>>>>>>>
>>>>>>> // Table from Debezium mysql example docker:
>>>>>>> //
>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>>>> // | Field | Type | Null | Key | Default | Extra |
>>>>>>> //
>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>>>> // | id | int(11) | NO | PRI | NULL | auto_increment |
>>>>>>> // | customer_id | int(11) | NO | MUL | NULL | |
>>>>>>> // | street | varchar(255) | NO | | NULL | |
>>>>>>> // | city | varchar(255) | NO | | NULL | |
>>>>>>> // | state | varchar(255) | NO | | NULL | |
>>>>>>> // | zip | varchar(255) | NO | | NULL | |
>>>>>>> // | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
>>>>>>> //
>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>>>>
>>>>>>> tableEnv.executeSql("""
>>>>>>> CREATE TABLE topic_addresses (
>>>>>>> -- schema is totally the same to the MySQL "addresses" table
>>>>>>> id INT,
>>>>>>> customer_id INT,
>>>>>>> street STRING,
>>>>>>> city STRING,
>>>>>>> state STRING,
>>>>>>> zip STRING,
>>>>>>> type STRING,
>>>>>>> PRIMARY KEY (id) NOT ENFORCED
>>>>>>> ) WITH (
>>>>>>> 'connector' = 'kafka',
>>>>>>> 'topic' = 'dbserver1.inventory.addresses',
>>>>>>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
>>>>>>> 'properties.group.id' = 'testGroup',
>>>>>>> 'format' = 'debezium-json' -- using debezium-json as the format
>>>>>>> )
>>>>>>> """)
>>>>>>>
>>>>>>> val table = tableEnv.from("topic_addresses").select($"*")
>>>>>>>
>>>>>>> // Defining a PK automatically puts it in Upsert mode, which we want.
>>>>>>> // TODO: type should be a keyword, is that acceptable by the DDL?
>>>>>>> tableEnv.executeSql("""
>>>>>>> CREATE TABLE ESAddresses (
>>>>>>> id INT,
>>>>>>> customer_id INT,
>>>>>>> street STRING,
>>>>>>> city STRING,
>>>>>>> state STRING,
>>>>>>> zip STRING,
>>>>>>> type STRING,
>>>>>>> PRIMARY KEY (id) NOT ENFORCED
>>>>>>> ) WITH (
>>>>>>> 'connector' = 'elasticsearch-7',
>>>>>>> 'hosts' = 'http://flink-jdbc-test_graph-elasticsearch_1:9200',
>>>>>>> 'index' = 'flinkaddresses',
>>>>>>> 'format' = 'json'
>>>>>>> )
>>>>>>> """)
>>>>>>>
>>>>>>> table.executeInsert("ESAddresses").print()
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <Re...@remind101.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>> On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <im...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Regarding the performance difference, the proposed way will have
>>>>>>>>> one more stateful operator (deduplication) than the native 1.11 cdc
>>>>>>>>> support.
>>>>>>>>> The overhead of the deduplication operator is just similar to a
>>>>>>>>> simple group by aggregate (max on each non-key column).
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jark
>>>>>>>>>
>>>>>>>>> On Tue, 25 Aug 2020 at 02:21, Rex Fenley <Re...@remind101.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thank you so much for the help!
>>>>>>>>>>
>>>>>>>>>> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <
>>>>>>>>>> marta@ververica.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Yes — you'll get the full row in the payload; and you can also
>>>>>>>>>>> access the change operation, which might be useful in your case.
>>>>>>>>>>>
>>>>>>>>>>> About performance, I'm summoning Kurt and @Jark Wu
>>>>>>>>>>> <ja...@apache.org> to the thread, who will be able to give you a
>>>>>>>>>>> more complete answer and likely also some optimization tips for your
>>>>>>>>>>> specific use case.
>>>>>>>>>>>
>>>>>>>>>>> Marta
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <Re...@remind101.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Yup! This definitely helps and makes sense.
>>>>>>>>>>>>
>>>>>>>>>>>> The 'after' payload comes with all data from the row right? So
>>>>>>>>>>>> essentially inserts and updates I can insert/replace data by pk and null
>>>>>>>>>>>> values I just delete by pk, and then I can build out the rest of my joins
>>>>>>>>>>>> like normal.
>>>>>>>>>>>>
>>>>>>>>>>>> Are there any performance implications of doing it this way
>>>>>>>>>>>> that is different from the out-of-the-box 1.11 solution?
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <
>>>>>>>>>>>> marta@ververica.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi, Rex.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Part of what enabled CDC support in Flink 1.11 was the
>>>>>>>>>>>>> refactoring of the table source interfaces (FLIP-95 [1]), and the new
>>>>>>>>>>>>> ScanTableSource [2], which allows to emit bounded/unbounded streams with
>>>>>>>>>>>>> insert, update and delete rows.
>>>>>>>>>>>>>
>>>>>>>>>>>>> In theory, you could consume data generated with Debezium as
>>>>>>>>>>>>> regular JSON-encoded events before Flink 1.11 — there just wasn't a
>>>>>>>>>>>>> convenient way to really treat it as "changelog". As a workaround, what you
>>>>>>>>>>>>> can do in Flink 1.10 is process these messages as JSON and extract the
>>>>>>>>>>>>> "after" field from the payload, and then apply de-duplication [3] to keep
>>>>>>>>>>>>> only the last row.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The DDL for your source table would look something like:
>>>>>>>>>>>>>
>>>>>>>>>>>>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE,
>>>>>>>>>>>>> `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka',
>>>>>>>>>>>>> 'format' = 'json', ... );
>>>>>>>>>>>>> Hope this helps!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Marta
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>>>>>>>>>>>> [2]
>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
>>>>>>>>>>>>> [3]
>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <
>>>>>>>>>>>>> chesnay@apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> @Jark Would it be possible to use the 1.11 debezium support
>>>>>>>>>>>>>> in 1.10?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 20/08/2020 19:59, Rex Fenley wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm trying to set up Flink with Debezium CDC Connector on AWS
>>>>>>>>>>>>>> EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector
>>>>>>>>>>>>>> arrived in Flink 1.11.0, from looking at the documentation.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm wondering what alternative solutions are available for
>>>>>>>>>>>>>> connecting Debezium to Flink? Is there an open source Debezium connector
>>>>>>>>>>>>>> that works with Flink 1.10.0? Could I potentially pull the code out for the
>>>>>>>>>>>>>> 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0
>>>>>>>>>>>>>> api?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For context, I plan on doing some fairly complicated long
>>>>>>>>>>>>>> lived stateful joins / materialization using the Table API over data
>>>>>>>>>>>>>> ingested from Postgres and possibly MySQL.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Appreciate any help, thanks!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>>
>>>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>
>>>>>>>>
>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>
>>>>>>>
>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Arvid Heise | Senior Java Developer
>>>>
>>>> <https://www.ververica.com/>
>>>>
>>>> Follow us @VervericaData
>>>>
>>>> --
>>>>
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>> Conference
>>>>
>>>> Stream Processing | Event Driven | Real Time
>>>>
>>>> --
>>>>
>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>
>>>> --
>>>> Ververica GmbH
>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>>> (Toni) Cheng
>>>>
>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Debezium Flink EMR

Posted by Rex Fenley <Re...@remind101.com>.
Ah, my bad, thanks for pointing that out Arvid!

On Mon, Aug 31, 2020 at 12:00 PM Arvid Heise <ar...@ververica.com> wrote:

> Hi Rex,
>
> you still forgot
>
> 'debezium-json.schema-include' = true
>
> Please reread my mail.
>
>
> On Mon, Aug 31, 2020 at 7:55 PM Rex Fenley <Re...@remind101.com> wrote:
>
>> Thanks for the input, though I've certainly included a schema as is
>> reflected earlier in this thread. Including here again
>> ...
>> tableEnv.executeSql("""
>> CREATE TABLE topic_addresses (
>> -- schema is totally the same to the MySQL "addresses" table
>> id INT,
>> customer_id INT,
>> street STRING,
>> city STRING,
>> state STRING,
>> zip STRING,
>> type STRING,
>> PRIMARY KEY (id) NOT ENFORCED
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = 'dbserver1.inventory.addresses',
>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
>> 'properties.group.id' = 'testGroup',
>> 'format' = 'debezium-json' -- using debezium-json as the format
>> )
>> """)
>>
>> val table = tableEnv.from("topic_addresses").select($"*")
>> ...
>>
>> On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Rex,
>>>
>>> the connector expects a value without a schema, but the message contains
>>> a schema. You can tell Flink that the schema is included as written in the
>>> documentation [1].
>>>
>>> CREATE TABLE topic_products (
>>>   -- schema is totally the same to the MySQL "products" table
>>>   id BIGINT,
>>>   name STRING,
>>>   description STRING,
>>>   weight DECIMAL(10, 2)) WITH (
>>>  'connector' = 'kafka',
>>>  'topic' = 'products_binlog',
>>>  'properties.bootstrap.servers' = 'localhost:9092',
>>>  'properties.group.id' = 'testGroup',
>>>  'format' = 'debezium-json',
>>>  'debezium-json.schema-include' = true)
>>>
>>> @Jark Wu <im...@gmail.com> , it would be probably good to make the
>>> connector more robust and catch these types of misconfigurations.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#how-to-use-debezium-format
>>>
>>> On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley <Re...@remind101.com> wrote:
>>>
>>>> Awesome, so that took me a step further. When running i'm receiving an
>>>> error however. FYI, my docker-compose file is based on the Debezium mysql
>>>> tutorial which can be found here
>>>> https://debezium.io/documentation/reference/1.2/tutorial.html
>>>>
>>>> Part of the stack trace:
>>>>
>>>> flink-jobmanager_1     | Caused by: java.io.IOException: Corrupt
>>>> Debezium JSON message
>>>> '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111
>>>> cool street","city":"Big
>>>> City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'.
>>>> flink-jobmanager_1     | at
>>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
>>>> ~[flink-json-1.11.1.jar:1.11.1]
>>>> flink-jobmanager_1     | at
>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>>>> ~[?:?]
>>>> flink-jobmanager_1     | at
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>>>> ~[?:?]
>>>> flink-jobmanager_1     | at
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>>>> ~[?:?]
>>>> flink-jobmanager_1     | at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>>> ~[?:?]
>>>> flink-jobmanager_1     | at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>> flink-jobmanager_1     | at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>> flink-jobmanager_1     | at
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>> flink-jobmanager_1     | Caused by: java.lang.NullPointerException
>>>> flink-jobmanager_1     | at
>>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115)
>>>> ~[flink-json-1.11.1.jar:1.11.1]
>>>> flink-jobmanager_1     | at
>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>>>> ~[?:?]
>>>> flink-jobmanager_1     | at
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>>>> ~[?:?]
>>>> flink-jobmanager_1     | at
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>>>> ~[?:?]
>>>> flink-jobmanager_1     | at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>>> ~[?:?]
>>>> flink-jobmanager_1     | at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>> flink-jobmanager_1     | at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>> flink-jobmanager_1     | at
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>
>>>> On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <im...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> This is a known issue in 1.11.0, and has been fixed in 1.11.1.
>>>>>
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> On Fri, 28 Aug 2020 at 06:52, Rex Fenley <Re...@remind101.com> wrote:
>>>>>
>>>>>> Hi again!
>>>>>>
>>>>>> I'm tested out locally in docker on Flink 1.11 first to get my
>>>>>> bearings before downgrading to 1.10 and figuring out how to replace the
>>>>>> Debezium connector. However, I'm getting the following error
>>>>>> ```
>>>>>> Provided trait [BEFORE_AND_AFTER] can't satisfy required trait
>>>>>> [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
>>>>>> ```
>>>>>>
>>>>>> Any suggestions for me to fix this?
>>>>>>
>>>>>> code:
>>>>>>
>>>>>> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>> val blinkStreamSettings =
>>>>>> EnvironmentSettings
>>>>>> .newInstance()
>>>>>> .useBlinkPlanner()
>>>>>> .inStreamingMode()
>>>>>> .build()
>>>>>> val tableEnv = StreamTableEnvironment.create(bsEnv,
>>>>>> blinkStreamSettings)
>>>>>>
>>>>>> // Table from Debezium mysql example docker:
>>>>>> //
>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>>> // | Field | Type | Null | Key | Default | Extra |
>>>>>> //
>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>>> // | id | int(11) | NO | PRI | NULL | auto_increment |
>>>>>> // | customer_id | int(11) | NO | MUL | NULL | |
>>>>>> // | street | varchar(255) | NO | | NULL | |
>>>>>> // | city | varchar(255) | NO | | NULL | |
>>>>>> // | state | varchar(255) | NO | | NULL | |
>>>>>> // | zip | varchar(255) | NO | | NULL | |
>>>>>> // | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
>>>>>> //
>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>>>
>>>>>> tableEnv.executeSql("""
>>>>>> CREATE TABLE topic_addresses (
>>>>>> -- schema is totally the same to the MySQL "addresses" table
>>>>>> id INT,
>>>>>> customer_id INT,
>>>>>> street STRING,
>>>>>> city STRING,
>>>>>> state STRING,
>>>>>> zip STRING,
>>>>>> type STRING,
>>>>>> PRIMARY KEY (id) NOT ENFORCED
>>>>>> ) WITH (
>>>>>> 'connector' = 'kafka',
>>>>>> 'topic' = 'dbserver1.inventory.addresses',
>>>>>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
>>>>>> 'properties.group.id' = 'testGroup',
>>>>>> 'format' = 'debezium-json' -- using debezium-json as the format
>>>>>> )
>>>>>> """)
>>>>>>
>>>>>> val table = tableEnv.from("topic_addresses").select($"*")
>>>>>>
>>>>>> // Defining a PK automatically puts it in Upsert mode, which we want.
>>>>>> // TODO: type should be a keyword, is that acceptable by the DDL?
>>>>>> tableEnv.executeSql("""
>>>>>> CREATE TABLE ESAddresses (
>>>>>> id INT,
>>>>>> customer_id INT,
>>>>>> street STRING,
>>>>>> city STRING,
>>>>>> state STRING,
>>>>>> zip STRING,
>>>>>> type STRING,
>>>>>> PRIMARY KEY (id) NOT ENFORCED
>>>>>> ) WITH (
>>>>>> 'connector' = 'elasticsearch-7',
>>>>>> 'hosts' = 'http://flink-jdbc-test_graph-elasticsearch_1:9200',
>>>>>> 'index' = 'flinkaddresses',
>>>>>> 'format' = 'json'
>>>>>> )
>>>>>> """)
>>>>>>
>>>>>> table.executeInsert("ESAddresses").print()
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <Re...@remind101.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <im...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Regarding the performance difference, the proposed way will have
>>>>>>>> one more stateful operator (deduplication) than the native 1.11 cdc
>>>>>>>> support.
>>>>>>>> The overhead of the deduplication operator is just similar to a
>>>>>>>> simple group by aggregate (max on each non-key column).
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jark
>>>>>>>>
>>>>>>>> On Tue, 25 Aug 2020 at 02:21, Rex Fenley <Re...@remind101.com> wrote:
>>>>>>>>
>>>>>>>>> Thank you so much for the help!
>>>>>>>>>
>>>>>>>>> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <
>>>>>>>>> marta@ververica.com> wrote:
>>>>>>>>>
>>>>>>>>>> Yes — you'll get the full row in the payload; and you can also
>>>>>>>>>> access the change operation, which might be useful in your case.
>>>>>>>>>>
>>>>>>>>>> About performance, I'm summoning Kurt and @Jark Wu
>>>>>>>>>> <ja...@apache.org> to the thread, who will be able to give you a
>>>>>>>>>> more complete answer and likely also some optimization tips for your
>>>>>>>>>> specific use case.
>>>>>>>>>>
>>>>>>>>>> Marta
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <Re...@remind101.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Yup! This definitely helps and makes sense.
>>>>>>>>>>>
>>>>>>>>>>> The 'after' payload comes with all data from the row right? So
>>>>>>>>>>> essentially inserts and updates I can insert/replace data by pk and null
>>>>>>>>>>> values I just delete by pk, and then I can build out the rest of my joins
>>>>>>>>>>> like normal.
>>>>>>>>>>>
>>>>>>>>>>> Are there any performance implications of doing it this way that
>>>>>>>>>>> is different from the out-of-the-box 1.11 solution?
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <
>>>>>>>>>>> marta@ververica.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi, Rex.
>>>>>>>>>>>>
>>>>>>>>>>>> Part of what enabled CDC support in Flink 1.11 was the
>>>>>>>>>>>> refactoring of the table source interfaces (FLIP-95 [1]), and the new
>>>>>>>>>>>> ScanTableSource [2], which allows to emit bounded/unbounded streams with
>>>>>>>>>>>> insert, update and delete rows.
>>>>>>>>>>>>
>>>>>>>>>>>> In theory, you could consume data generated with Debezium as
>>>>>>>>>>>> regular JSON-encoded events before Flink 1.11 — there just wasn't a
>>>>>>>>>>>> convenient way to really treat it as "changelog". As a workaround, what you
>>>>>>>>>>>> can do in Flink 1.10 is process these messages as JSON and extract the
>>>>>>>>>>>> "after" field from the payload, and then apply de-duplication [3] to keep
>>>>>>>>>>>> only the last row.
>>>>>>>>>>>>
>>>>>>>>>>>> The DDL for your source table would look something like:
>>>>>>>>>>>>
>>>>>>>>>>>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE,
>>>>>>>>>>>> `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka',
>>>>>>>>>>>> 'format' = 'json', ... );
>>>>>>>>>>>> Hope this helps!
>>>>>>>>>>>>
>>>>>>>>>>>> Marta
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>>>>>>>>>>> [2]
>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
>>>>>>>>>>>> [3]
>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <
>>>>>>>>>>>> chesnay@apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> @Jark Would it be possible to use the 1.11 debezium support in
>>>>>>>>>>>>> 1.10?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 20/08/2020 19:59, Rex Fenley wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm trying to set up Flink with Debezium CDC Connector on AWS
>>>>>>>>>>>>> EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector
>>>>>>>>>>>>> arrived in Flink 1.11.0, from looking at the documentation.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm wondering what alternative solutions are available for
>>>>>>>>>>>>> connecting Debezium to Flink? Is there an open source Debezium connector
>>>>>>>>>>>>> that works with Flink 1.10.0? Could I potentially pull the code out for the
>>>>>>>>>>>>> 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0
>>>>>>>>>>>>> api?
>>>>>>>>>>>>>
>>>>>>>>>>>>> For context, I plan on doing some fairly complicated long
>>>>>>>>>>>>> lived stateful joins / materialization using the Table API over data
>>>>>>>>>>>>> ingested from Postgres and possibly MySQL.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Appreciate any help, thanks!
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>>
>>>>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>>
>>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>
>>>>>>>
>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>
>>>>>>
>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>> <https://www.facebook.com/remindhq>
>>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Debezium Flink EMR

Posted by Arvid Heise <ar...@ververica.com>.
Hi Rex,

you still forgot

'debezium-json.schema-include' = true

Please reread my mail.


On Mon, Aug 31, 2020 at 7:55 PM Rex Fenley <Re...@remind101.com> wrote:

> Thanks for the input, though I've certainly included a schema as is
> reflected earlier in this thread. Including here again
> ...
> tableEnv.executeSql("""
> CREATE TABLE topic_addresses (
> -- schema is totally the same to the MySQL "addresses" table
> id INT,
> customer_id INT,
> street STRING,
> city STRING,
> state STRING,
> zip STRING,
> type STRING,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'dbserver1.inventory.addresses',
> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
> 'properties.group.id' = 'testGroup',
> 'format' = 'debezium-json' -- using debezium-json as the format
> )
> """)
>
> val table = tableEnv.from("topic_addresses").select($"*")
> ...
>
> On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Rex,
>>
>> the connector expects a value without a schema, but the message contains
>> a schema. You can tell Flink that the schema is included as written in the
>> documentation [1].
>>
>> CREATE TABLE topic_products (
>>   -- schema is totally the same to the MySQL "products" table
>>   id BIGINT,
>>   name STRING,
>>   description STRING,
>>   weight DECIMAL(10, 2)) WITH (
>>  'connector' = 'kafka',
>>  'topic' = 'products_binlog',
>>  'properties.bootstrap.servers' = 'localhost:9092',
>>  'properties.group.id' = 'testGroup',
>>  'format' = 'debezium-json',
>>  'debezium-json.schema-include' = true)
>>
>> @Jark Wu <im...@gmail.com> , it would be probably good to make the
>> connector more robust and catch these types of misconfigurations.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#how-to-use-debezium-format
>>
>> On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley <Re...@remind101.com> wrote:
>>
>>> Awesome, so that took me a step further. When running i'm receiving an
>>> error however. FYI, my docker-compose file is based on the Debezium mysql
>>> tutorial which can be found here
>>> https://debezium.io/documentation/reference/1.2/tutorial.html
>>>
>>> Part of the stack trace:
>>>
>>> flink-jobmanager_1     | Caused by: java.io.IOException: Corrupt
>>> Debezium JSON message
>>> '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111
>>> cool street","city":"Big
>>> City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'.
>>> flink-jobmanager_1     | at
>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
>>> ~[flink-json-1.11.1.jar:1.11.1]
>>> flink-jobmanager_1     | at
>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>>> ~[?:?]
>>> flink-jobmanager_1     | at
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>>> ~[?:?]
>>> flink-jobmanager_1     | at
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>>> ~[?:?]
>>> flink-jobmanager_1     | at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>> ~[?:?]
>>> flink-jobmanager_1     | at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>> flink-jobmanager_1     | at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>> flink-jobmanager_1     | at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>> flink-jobmanager_1     | Caused by: java.lang.NullPointerException
>>> flink-jobmanager_1     | at
>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115)
>>> ~[flink-json-1.11.1.jar:1.11.1]
>>> flink-jobmanager_1     | at
>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>>> ~[?:?]
>>> flink-jobmanager_1     | at
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>>> ~[?:?]
>>> flink-jobmanager_1     | at
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>>> ~[?:?]
>>> flink-jobmanager_1     | at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>> ~[?:?]
>>> flink-jobmanager_1     | at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>> flink-jobmanager_1     | at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>> flink-jobmanager_1     | at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>
>>> On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <im...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> This is a known issue in 1.11.0, and has been fixed in 1.11.1.
>>>>
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Fri, 28 Aug 2020 at 06:52, Rex Fenley <Re...@remind101.com> wrote:
>>>>
>>>>> Hi again!
>>>>>
>>>>> I'm tested out locally in docker on Flink 1.11 first to get my
>>>>> bearings before downgrading to 1.10 and figuring out how to replace the
>>>>> Debezium connector. However, I'm getting the following error
>>>>> ```
>>>>> Provided trait [BEFORE_AND_AFTER] can't satisfy required trait
>>>>> [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
>>>>> ```
>>>>>
>>>>> Any suggestions for me to fix this?
>>>>>
>>>>> code:
>>>>>
>>>>> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>>>> val blinkStreamSettings =
>>>>> EnvironmentSettings
>>>>> .newInstance()
>>>>> .useBlinkPlanner()
>>>>> .inStreamingMode()
>>>>> .build()
>>>>> val tableEnv = StreamTableEnvironment.create(bsEnv,
>>>>> blinkStreamSettings)
>>>>>
>>>>> // Table from Debezium mysql example docker:
>>>>> //
>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>> // | Field | Type | Null | Key | Default | Extra |
>>>>> //
>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>> // | id | int(11) | NO | PRI | NULL | auto_increment |
>>>>> // | customer_id | int(11) | NO | MUL | NULL | |
>>>>> // | street | varchar(255) | NO | | NULL | |
>>>>> // | city | varchar(255) | NO | | NULL | |
>>>>> // | state | varchar(255) | NO | | NULL | |
>>>>> // | zip | varchar(255) | NO | | NULL | |
>>>>> // | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
>>>>> //
>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>>
>>>>> tableEnv.executeSql("""
>>>>> CREATE TABLE topic_addresses (
>>>>> -- schema is totally the same to the MySQL "addresses" table
>>>>> id INT,
>>>>> customer_id INT,
>>>>> street STRING,
>>>>> city STRING,
>>>>> state STRING,
>>>>> zip STRING,
>>>>> type STRING,
>>>>> PRIMARY KEY (id) NOT ENFORCED
>>>>> ) WITH (
>>>>> 'connector' = 'kafka',
>>>>> 'topic' = 'dbserver1.inventory.addresses',
>>>>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
>>>>> 'properties.group.id' = 'testGroup',
>>>>> 'format' = 'debezium-json' -- using debezium-json as the format
>>>>> )
>>>>> """)
>>>>>
>>>>> val table = tableEnv.from("topic_addresses").select($"*")
>>>>>
>>>>> // Defining a PK automatically puts it in Upsert mode, which we want.
>>>>> // TODO: type should be a keyword, is that acceptable by the DDL?
>>>>> tableEnv.executeSql("""
>>>>> CREATE TABLE ESAddresses (
>>>>> id INT,
>>>>> customer_id INT,
>>>>> street STRING,
>>>>> city STRING,
>>>>> state STRING,
>>>>> zip STRING,
>>>>> type STRING,
>>>>> PRIMARY KEY (id) NOT ENFORCED
>>>>> ) WITH (
>>>>> 'connector' = 'elasticsearch-7',
>>>>> 'hosts' = 'http://flink-jdbc-test_graph-elasticsearch_1:9200',
>>>>> 'index' = 'flinkaddresses',
>>>>> 'format' = 'json'
>>>>> )
>>>>> """)
>>>>>
>>>>> table.executeInsert("ESAddresses").print()
>>>>>
>>>>> Thanks!
>>>>>
>>>>> On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <Re...@remind101.com> wrote:
>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <im...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Regarding the performance difference, the proposed way will have one
>>>>>>> more stateful operator (deduplication) than the native 1.11 cdc support.
>>>>>>> The overhead of the deduplication operator is just similar to a
>>>>>>> simple group by aggregate (max on each non-key column).
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>> On Tue, 25 Aug 2020 at 02:21, Rex Fenley <Re...@remind101.com> wrote:
>>>>>>>
>>>>>>>> Thank you so much for the help!
>>>>>>>>
>>>>>>>> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <
>>>>>>>> marta@ververica.com> wrote:
>>>>>>>>
>>>>>>>>> Yes — you'll get the full row in the payload; and you can also
>>>>>>>>> access the change operation, which might be useful in your case.
>>>>>>>>>
>>>>>>>>> About performance, I'm summoning Kurt and @Jark Wu
>>>>>>>>> <ja...@apache.org> to the thread, who will be able to give you a
>>>>>>>>> more complete answer and likely also some optimization tips for your
>>>>>>>>> specific use case.
>>>>>>>>>
>>>>>>>>> Marta
>>>>>>>>>
>>>>>>>>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <Re...@remind101.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Yup! This definitely helps and makes sense.
>>>>>>>>>>
>>>>>>>>>> The 'after' payload comes with all data from the row right? So
>>>>>>>>>> essentially inserts and updates I can insert/replace data by pk and null
>>>>>>>>>> values I just delete by pk, and then I can build out the rest of my joins
>>>>>>>>>> like normal.
>>>>>>>>>>
>>>>>>>>>> Are there any performance implications of doing it this way that
>>>>>>>>>> is different from the out-of-the-box 1.11 solution?
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <
>>>>>>>>>> marta@ververica.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi, Rex.
>>>>>>>>>>>
>>>>>>>>>>> Part of what enabled CDC support in Flink 1.11 was the
>>>>>>>>>>> refactoring of the table source interfaces (FLIP-95 [1]), and the new
>>>>>>>>>>> ScanTableSource [2], which allows to emit bounded/unbounded streams with
>>>>>>>>>>> insert, update and delete rows.
>>>>>>>>>>>
>>>>>>>>>>> In theory, you could consume data generated with Debezium as
>>>>>>>>>>> regular JSON-encoded events before Flink 1.11 — there just wasn't a
>>>>>>>>>>> convenient way to really treat it as "changelog". As a workaround, what you
>>>>>>>>>>> can do in Flink 1.10 is process these messages as JSON and extract the
>>>>>>>>>>> "after" field from the payload, and then apply de-duplication [3] to keep
>>>>>>>>>>> only the last row.
>>>>>>>>>>>
>>>>>>>>>>> The DDL for your source table would look something like:
>>>>>>>>>>>
>>>>>>>>>>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE,
>>>>>>>>>>> `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format'
>>>>>>>>>>> = 'json', ... );
>>>>>>>>>>> Hope this helps!
>>>>>>>>>>>
>>>>>>>>>>> Marta
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>>>>>>>>>> [2]
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
>>>>>>>>>>> [3]
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <
>>>>>>>>>>> chesnay@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> @Jark Would it be possible to use the 1.11 debezium support in
>>>>>>>>>>>> 1.10?
>>>>>>>>>>>>
>>>>>>>>>>>> On 20/08/2020 19:59, Rex Fenley wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> I'm trying to set up Flink with Debezium CDC Connector on AWS
>>>>>>>>>>>> EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector
>>>>>>>>>>>> arrived in Flink 1.11.0, from looking at the documentation.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>>>>>>>>>>>>
>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>>>>>>>>>>>>
>>>>>>>>>>>> I'm wondering what alternative solutions are available for
>>>>>>>>>>>> connecting Debezium to Flink? Is there an open source Debezium connector
>>>>>>>>>>>> that works with Flink 1.10.0? Could I potentially pull the code out for the
>>>>>>>>>>>> 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0
>>>>>>>>>>>> api?
>>>>>>>>>>>>
>>>>>>>>>>>> For context, I plan on doing some fairly complicated long lived
>>>>>>>>>>>> stateful joins / materialization using the Table API over data ingested
>>>>>>>>>>>> from Postgres and possibly MySQL.
>>>>>>>>>>>>
>>>>>>>>>>>> Appreciate any help, thanks!
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>>
>>>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>
>>>>>>>>
>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>
>>>>>>
>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>> <https://www.facebook.com/remindhq>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Debezium Flink EMR

Posted by Rex Fenley <Re...@remind101.com>.
Thanks for the input, though I've certainly included a schema as is
reflected earlier in this thread. Including here again
...
tableEnv.executeSql("""
CREATE TABLE topic_addresses (
-- schema is totally the same to the MySQL "addresses" table
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.addresses',
'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json' -- using debezium-json as the format
)
""")

val table = tableEnv.from("topic_addresses").select($"*")
...

On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise <ar...@ververica.com> wrote:

> Hi Rex,
>
> the connector expects a value without a schema, but the message contains a
> schema. You can tell Flink that the schema is included as written in the
> documentation [1].
>
> CREATE TABLE topic_products (
>   -- schema is totally the same to the MySQL "products" table
>   id BIGINT,
>   name STRING,
>   description STRING,
>   weight DECIMAL(10, 2)) WITH (
>  'connector' = 'kafka',
>  'topic' = 'products_binlog',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'testGroup',
>  'format' = 'debezium-json',
>  'debezium-json.schema-include' = true)
>
> @Jark Wu <im...@gmail.com> , it would be probably good to make the
> connector more robust and catch these types of misconfigurations.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#how-to-use-debezium-format
>
> On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley <Re...@remind101.com> wrote:
>
>> Awesome, so that took me a step further. When running i'm receiving an
>> error however. FYI, my docker-compose file is based on the Debezium mysql
>> tutorial which can be found here
>> https://debezium.io/documentation/reference/1.2/tutorial.html
>>
>> Part of the stack trace:
>>
>> flink-jobmanager_1     | Caused by: java.io.IOException: Corrupt Debezium
>> JSON message
>> '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111
>> cool street","city":"Big
>> City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'.
>> flink-jobmanager_1     | at
>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
>> ~[flink-json-1.11.1.jar:1.11.1]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>> ~[?:?]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>> ~[?:?]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>> ~[?:?]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>> ~[?:?]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> flink-jobmanager_1     | Caused by: java.lang.NullPointerException
>> flink-jobmanager_1     | at
>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115)
>> ~[flink-json-1.11.1.jar:1.11.1]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>> ~[?:?]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>> ~[?:?]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>> ~[?:?]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>> ~[?:?]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>
>> On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <im...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> This is a known issue in 1.11.0, and has been fixed in 1.11.1.
>>>
>>>
>>> Best,
>>> Jark
>>>
>>> On Fri, 28 Aug 2020 at 06:52, Rex Fenley <Re...@remind101.com> wrote:
>>>
>>>> Hi again!
>>>>
>>>> I'm tested out locally in docker on Flink 1.11 first to get my bearings
>>>> before downgrading to 1.10 and figuring out how to replace the Debezium
>>>> connector. However, I'm getting the following error
>>>> ```
>>>> Provided trait [BEFORE_AND_AFTER] can't satisfy required trait
>>>> [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
>>>> ```
>>>>
>>>> Any suggestions for me to fix this?
>>>>
>>>> code:
>>>>
>>>> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>>> val blinkStreamSettings =
>>>> EnvironmentSettings
>>>> .newInstance()
>>>> .useBlinkPlanner()
>>>> .inStreamingMode()
>>>> .build()
>>>> val tableEnv = StreamTableEnvironment.create(bsEnv,
>>>> blinkStreamSettings)
>>>>
>>>> // Table from Debezium mysql example docker:
>>>> //
>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>> // | Field | Type | Null | Key | Default | Extra |
>>>> //
>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>> // | id | int(11) | NO | PRI | NULL | auto_increment |
>>>> // | customer_id | int(11) | NO | MUL | NULL | |
>>>> // | street | varchar(255) | NO | | NULL | |
>>>> // | city | varchar(255) | NO | | NULL | |
>>>> // | state | varchar(255) | NO | | NULL | |
>>>> // | zip | varchar(255) | NO | | NULL | |
>>>> // | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
>>>> //
>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>
>>>> tableEnv.executeSql("""
>>>> CREATE TABLE topic_addresses (
>>>> -- schema is totally the same to the MySQL "addresses" table
>>>> id INT,
>>>> customer_id INT,
>>>> street STRING,
>>>> city STRING,
>>>> state STRING,
>>>> zip STRING,
>>>> type STRING,
>>>> PRIMARY KEY (id) NOT ENFORCED
>>>> ) WITH (
>>>> 'connector' = 'kafka',
>>>> 'topic' = 'dbserver1.inventory.addresses',
>>>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
>>>> 'properties.group.id' = 'testGroup',
>>>> 'format' = 'debezium-json' -- using debezium-json as the format
>>>> )
>>>> """)
>>>>
>>>> val table = tableEnv.from("topic_addresses").select($"*")
>>>>
>>>> // Defining a PK automatically puts it in Upsert mode, which we want.
>>>> // TODO: type should be a keyword, is that acceptable by the DDL?
>>>> tableEnv.executeSql("""
>>>> CREATE TABLE ESAddresses (
>>>> id INT,
>>>> customer_id INT,
>>>> street STRING,
>>>> city STRING,
>>>> state STRING,
>>>> zip STRING,
>>>> type STRING,
>>>> PRIMARY KEY (id) NOT ENFORCED
>>>> ) WITH (
>>>> 'connector' = 'elasticsearch-7',
>>>> 'hosts' = 'http://flink-jdbc-test_graph-elasticsearch_1:9200',
>>>> 'index' = 'flinkaddresses',
>>>> 'format' = 'json'
>>>> )
>>>> """)
>>>>
>>>> table.executeInsert("ESAddresses").print()
>>>>
>>>> Thanks!
>>>>
>>>> On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <Re...@remind101.com> wrote:
>>>>
>>>>> Thanks!
>>>>>
>>>>> On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <im...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Regarding the performance difference, the proposed way will have one
>>>>>> more stateful operator (deduplication) than the native 1.11 cdc support.
>>>>>> The overhead of the deduplication operator is just similar to a
>>>>>> simple group by aggregate (max on each non-key column).
>>>>>>
>>>>>> Best,
>>>>>> Jark
>>>>>>
>>>>>> On Tue, 25 Aug 2020 at 02:21, Rex Fenley <Re...@remind101.com> wrote:
>>>>>>
>>>>>>> Thank you so much for the help!
>>>>>>>
>>>>>>> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <
>>>>>>> marta@ververica.com> wrote:
>>>>>>>
>>>>>>>> Yes — you'll get the full row in the payload; and you can also
>>>>>>>> access the change operation, which might be useful in your case.
>>>>>>>>
>>>>>>>> About performance, I'm summoning Kurt and @Jark Wu
>>>>>>>> <ja...@apache.org> to the thread, who will be able to give you a
>>>>>>>> more complete answer and likely also some optimization tips for your
>>>>>>>> specific use case.
>>>>>>>>
>>>>>>>> Marta
>>>>>>>>
>>>>>>>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <Re...@remind101.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Yup! This definitely helps and makes sense.
>>>>>>>>>
>>>>>>>>> The 'after' payload comes with all data from the row right? So
>>>>>>>>> essentially inserts and updates I can insert/replace data by pk and null
>>>>>>>>> values I just delete by pk, and then I can build out the rest of my joins
>>>>>>>>> like normal.
>>>>>>>>>
>>>>>>>>> Are there any performance implications of doing it this way that
>>>>>>>>> is different from the out-of-the-box 1.11 solution?
>>>>>>>>>
>>>>>>>>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <
>>>>>>>>> marta@ververica.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi, Rex.
>>>>>>>>>>
>>>>>>>>>> Part of what enabled CDC support in Flink 1.11 was the
>>>>>>>>>> refactoring of the table source interfaces (FLIP-95 [1]), and the new
>>>>>>>>>> ScanTableSource [2], which allows to emit bounded/unbounded streams with
>>>>>>>>>> insert, update and delete rows.
>>>>>>>>>>
>>>>>>>>>> In theory, you could consume data generated with Debezium as
>>>>>>>>>> regular JSON-encoded events before Flink 1.11 — there just wasn't a
>>>>>>>>>> convenient way to really treat it as "changelog". As a workaround, what you
>>>>>>>>>> can do in Flink 1.10 is process these messages as JSON and extract the
>>>>>>>>>> "after" field from the payload, and then apply de-duplication [3] to keep
>>>>>>>>>> only the last row.
>>>>>>>>>>
>>>>>>>>>> The DDL for your source table would look something like:
>>>>>>>>>>
>>>>>>>>>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE,
>>>>>>>>>> `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format'
>>>>>>>>>> = 'json', ... );
>>>>>>>>>> Hope this helps!
>>>>>>>>>>
>>>>>>>>>> Marta
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>>>>>>>>> [2]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
>>>>>>>>>> [3]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <
>>>>>>>>>> chesnay@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> @Jark Would it be possible to use the 1.11 debezium support in
>>>>>>>>>>> 1.10?
>>>>>>>>>>>
>>>>>>>>>>> On 20/08/2020 19:59, Rex Fenley wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I'm trying to set up Flink with Debezium CDC Connector on AWS
>>>>>>>>>>> EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector
>>>>>>>>>>> arrived in Flink 1.11.0, from looking at the documentation.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>>>>>>>>>>>
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>>>>>>>>>>>
>>>>>>>>>>> I'm wondering what alternative solutions are available for
>>>>>>>>>>> connecting Debezium to Flink? Is there an open source Debezium connector
>>>>>>>>>>> that works with Flink 1.10.0? Could I potentially pull the code out for the
>>>>>>>>>>> 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0
>>>>>>>>>>> api?
>>>>>>>>>>>
>>>>>>>>>>> For context, I plan on doing some fairly complicated long lived
>>>>>>>>>>> stateful joins / materialization using the Table API over data ingested
>>>>>>>>>>> from Postgres and possibly MySQL.
>>>>>>>>>>>
>>>>>>>>>>> Appreciate any help, thanks!
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>>
>>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>
>>>>>>>
>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Debezium Flink EMR

Posted by Arvid Heise <ar...@ververica.com>.
Hi Rex,

the connector expects a value without a schema, but the message contains a
schema. You can tell Flink that the schema is included as written in the
documentation [1].

CREATE TABLE topic_products (
  -- schema is totally the same to the MySQL "products" table
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'debezium-json',
 'debezium-json.schema-include' = true)

@Jark Wu <im...@gmail.com> , it would be probably good to make the
connector more robust and catch these types of misconfigurations.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#how-to-use-debezium-format

On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley <Re...@remind101.com> wrote:

> Awesome, so that took me a step further. When running i'm receiving an
> error however. FYI, my docker-compose file is based on the Debezium mysql
> tutorial which can be found here
> https://debezium.io/documentation/reference/1.2/tutorial.html
>
> Part of the stack trace:
>
> flink-jobmanager_1     | Caused by: java.io.IOException: Corrupt Debezium
> JSON message
> '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111
> cool street","city":"Big
> City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'.
> flink-jobmanager_1     | at
> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
> ~[flink-json-1.11.1.jar:1.11.1]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
> ~[?:?]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> ~[?:?]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> ~[?:?]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> ~[?:?]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> flink-jobmanager_1     | Caused by: java.lang.NullPointerException
> flink-jobmanager_1     | at
> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115)
> ~[flink-json-1.11.1.jar:1.11.1]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
> ~[?:?]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> ~[?:?]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> ~[?:?]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> ~[?:?]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>
> On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <im...@gmail.com> wrote:
>
>> Hi,
>>
>> This is a known issue in 1.11.0, and has been fixed in 1.11.1.
>>
>>
>> Best,
>> Jark
>>
>> On Fri, 28 Aug 2020 at 06:52, Rex Fenley <Re...@remind101.com> wrote:
>>
>>> Hi again!
>>>
>>> I'm tested out locally in docker on Flink 1.11 first to get my bearings
>>> before downgrading to 1.10 and figuring out how to replace the Debezium
>>> connector. However, I'm getting the following error
>>> ```
>>> Provided trait [BEFORE_AND_AFTER] can't satisfy required trait
>>> [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
>>> ```
>>>
>>> Any suggestions for me to fix this?
>>>
>>> code:
>>>
>>> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>> val blinkStreamSettings =
>>> EnvironmentSettings
>>> .newInstance()
>>> .useBlinkPlanner()
>>> .inStreamingMode()
>>> .build()
>>> val tableEnv = StreamTableEnvironment.create(bsEnv, blinkStreamSettings)
>>>
>>> // Table from Debezium mysql example docker:
>>> //
>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>> // | Field | Type | Null | Key | Default | Extra |
>>> //
>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>> // | id | int(11) | NO | PRI | NULL | auto_increment |
>>> // | customer_id | int(11) | NO | MUL | NULL | |
>>> // | street | varchar(255) | NO | | NULL | |
>>> // | city | varchar(255) | NO | | NULL | |
>>> // | state | varchar(255) | NO | | NULL | |
>>> // | zip | varchar(255) | NO | | NULL | |
>>> // | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
>>> //
>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>
>>> tableEnv.executeSql("""
>>> CREATE TABLE topic_addresses (
>>> -- schema is totally the same to the MySQL "addresses" table
>>> id INT,
>>> customer_id INT,
>>> street STRING,
>>> city STRING,
>>> state STRING,
>>> zip STRING,
>>> type STRING,
>>> PRIMARY KEY (id) NOT ENFORCED
>>> ) WITH (
>>> 'connector' = 'kafka',
>>> 'topic' = 'dbserver1.inventory.addresses',
>>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
>>> 'properties.group.id' = 'testGroup',
>>> 'format' = 'debezium-json' -- using debezium-json as the format
>>> )
>>> """)
>>>
>>> val table = tableEnv.from("topic_addresses").select($"*")
>>>
>>> // Defining a PK automatically puts it in Upsert mode, which we want.
>>> // TODO: type should be a keyword, is that acceptable by the DDL?
>>> tableEnv.executeSql("""
>>> CREATE TABLE ESAddresses (
>>> id INT,
>>> customer_id INT,
>>> street STRING,
>>> city STRING,
>>> state STRING,
>>> zip STRING,
>>> type STRING,
>>> PRIMARY KEY (id) NOT ENFORCED
>>> ) WITH (
>>> 'connector' = 'elasticsearch-7',
>>> 'hosts' = 'http://flink-jdbc-test_graph-elasticsearch_1:9200',
>>> 'index' = 'flinkaddresses',
>>> 'format' = 'json'
>>> )
>>> """)
>>>
>>> table.executeInsert("ESAddresses").print()
>>>
>>> Thanks!
>>>
>>> On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <Re...@remind101.com> wrote:
>>>
>>>> Thanks!
>>>>
>>>> On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <im...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Regarding the performance difference, the proposed way will have one
>>>>> more stateful operator (deduplication) than the native 1.11 cdc support.
>>>>> The overhead of the deduplication operator is just similar to a simple
>>>>> group by aggregate (max on each non-key column).
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> On Tue, 25 Aug 2020 at 02:21, Rex Fenley <Re...@remind101.com> wrote:
>>>>>
>>>>>> Thank you so much for the help!
>>>>>>
>>>>>> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <
>>>>>> marta@ververica.com> wrote:
>>>>>>
>>>>>>> Yes — you'll get the full row in the payload; and you can also
>>>>>>> access the change operation, which might be useful in your case.
>>>>>>>
>>>>>>> About performance, I'm summoning Kurt and @Jark Wu <ja...@apache.org> to
>>>>>>> the thread, who will be able to give you a more complete answer and likely
>>>>>>> also some optimization tips for your specific use case.
>>>>>>>
>>>>>>> Marta
>>>>>>>
>>>>>>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <Re...@remind101.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yup! This definitely helps and makes sense.
>>>>>>>>
>>>>>>>> The 'after' payload comes with all data from the row right? So
>>>>>>>> essentially inserts and updates I can insert/replace data by pk and null
>>>>>>>> values I just delete by pk, and then I can build out the rest of my joins
>>>>>>>> like normal.
>>>>>>>>
>>>>>>>> Are there any performance implications of doing it this way that is
>>>>>>>> different from the out-of-the-box 1.11 solution?
>>>>>>>>
>>>>>>>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <
>>>>>>>> marta@ververica.com> wrote:
>>>>>>>>
>>>>>>>>> Hi, Rex.
>>>>>>>>>
>>>>>>>>> Part of what enabled CDC support in Flink 1.11 was the refactoring
>>>>>>>>> of the table source interfaces (FLIP-95 [1]), and the new ScanTableSource
>>>>>>>>> [2], which allows to emit bounded/unbounded streams with insert, update and
>>>>>>>>> delete rows.
>>>>>>>>>
>>>>>>>>> In theory, you could consume data generated with Debezium as
>>>>>>>>> regular JSON-encoded events before Flink 1.11 — there just wasn't a
>>>>>>>>> convenient way to really treat it as "changelog". As a workaround, what you
>>>>>>>>> can do in Flink 1.10 is process these messages as JSON and extract the
>>>>>>>>> "after" field from the payload, and then apply de-duplication [3] to keep
>>>>>>>>> only the last row.
>>>>>>>>>
>>>>>>>>> The DDL for your source table would look something like:
>>>>>>>>>
>>>>>>>>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE,
>>>>>>>>> `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' =
>>>>>>>>> 'json', ... );
>>>>>>>>> Hope this helps!
>>>>>>>>>
>>>>>>>>> Marta
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>>>>>>>> [2]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
>>>>>>>>> [3]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <
>>>>>>>>> chesnay@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> @Jark Would it be possible to use the 1.11 debezium support in
>>>>>>>>>> 1.10?
>>>>>>>>>>
>>>>>>>>>> On 20/08/2020 19:59, Rex Fenley wrote:
>>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I'm trying to set up Flink with Debezium CDC Connector on AWS
>>>>>>>>>> EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector
>>>>>>>>>> arrived in Flink 1.11.0, from looking at the documentation.
>>>>>>>>>>
>>>>>>>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>>>>>>>>>>
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>>>>>>>>>>
>>>>>>>>>> I'm wondering what alternative solutions are available for
>>>>>>>>>> connecting Debezium to Flink? Is there an open source Debezium connector
>>>>>>>>>> that works with Flink 1.10.0? Could I potentially pull the code out for the
>>>>>>>>>> 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0
>>>>>>>>>> api?
>>>>>>>>>>
>>>>>>>>>> For context, I plan on doing some fairly complicated long lived
>>>>>>>>>> stateful joins / materialization using the Table API over data ingested
>>>>>>>>>> from Postgres and possibly MySQL.
>>>>>>>>>>
>>>>>>>>>> Appreciate any help, thanks!
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>
>>>>>>>>
>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>
>>>>>>
>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>> <https://www.facebook.com/remindhq>
>>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Debezium Flink EMR

Posted by Rex Fenley <Re...@remind101.com>.
Awesome, so that took me a step further. When running i'm receiving an
error however. FYI, my docker-compose file is based on the Debezium mysql
tutorial which can be found here
https://debezium.io/documentation/reference/1.2/tutorial.html

Part of the stack trace:

flink-jobmanager_1     | Caused by: java.io.IOException: Corrupt Debezium
JSON message
'{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111
cool street","city":"Big
City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'.
flink-jobmanager_1     | at
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
~[?:?]
flink-jobmanager_1     | at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
~[?:?]
flink-jobmanager_1     | at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
~[?:?]
flink-jobmanager_1     | at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
~[?:?]
flink-jobmanager_1     | at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | Caused by: java.lang.NullPointerException
flink-jobmanager_1     | at
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115)
~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
~[?:?]
flink-jobmanager_1     | at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
~[?:?]
flink-jobmanager_1     | at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
~[?:?]
flink-jobmanager_1     | at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
~[?:?]
flink-jobmanager_1     | at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
~[flink-dist_2.12-1.11.1.jar:1.11.1]

On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <im...@gmail.com> wrote:

> Hi,
>
> This is a known issue in 1.11.0, and has been fixed in 1.11.1.
>
>
> Best,
> Jark
>
> On Fri, 28 Aug 2020 at 06:52, Rex Fenley <Re...@remind101.com> wrote:
>
>> Hi again!
>>
>> I'm tested out locally in docker on Flink 1.11 first to get my bearings
>> before downgrading to 1.10 and figuring out how to replace the Debezium
>> connector. However, I'm getting the following error
>> ```
>> Provided trait [BEFORE_AND_AFTER] can't satisfy required trait
>> [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
>> ```
>>
>> Any suggestions for me to fix this?
>>
>> code:
>>
>> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> val blinkStreamSettings =
>> EnvironmentSettings
>> .newInstance()
>> .useBlinkPlanner()
>> .inStreamingMode()
>> .build()
>> val tableEnv = StreamTableEnvironment.create(bsEnv, blinkStreamSettings)
>>
>> // Table from Debezium mysql example docker:
>> //
>> +-------------+-------------------------------------+------+-----+---------+----------------+
>> // | Field | Type | Null | Key | Default | Extra |
>> //
>> +-------------+-------------------------------------+------+-----+---------+----------------+
>> // | id | int(11) | NO | PRI | NULL | auto_increment |
>> // | customer_id | int(11) | NO | MUL | NULL | |
>> // | street | varchar(255) | NO | | NULL | |
>> // | city | varchar(255) | NO | | NULL | |
>> // | state | varchar(255) | NO | | NULL | |
>> // | zip | varchar(255) | NO | | NULL | |
>> // | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
>> //
>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>
>> tableEnv.executeSql("""
>> CREATE TABLE topic_addresses (
>> -- schema is totally the same to the MySQL "addresses" table
>> id INT,
>> customer_id INT,
>> street STRING,
>> city STRING,
>> state STRING,
>> zip STRING,
>> type STRING,
>> PRIMARY KEY (id) NOT ENFORCED
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = 'dbserver1.inventory.addresses',
>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
>> 'properties.group.id' = 'testGroup',
>> 'format' = 'debezium-json' -- using debezium-json as the format
>> )
>> """)
>>
>> val table = tableEnv.from("topic_addresses").select($"*")
>>
>> // Defining a PK automatically puts it in Upsert mode, which we want.
>> // TODO: type should be a keyword, is that acceptable by the DDL?
>> tableEnv.executeSql("""
>> CREATE TABLE ESAddresses (
>> id INT,
>> customer_id INT,
>> street STRING,
>> city STRING,
>> state STRING,
>> zip STRING,
>> type STRING,
>> PRIMARY KEY (id) NOT ENFORCED
>> ) WITH (
>> 'connector' = 'elasticsearch-7',
>> 'hosts' = 'http://flink-jdbc-test_graph-elasticsearch_1:9200',
>> 'index' = 'flinkaddresses',
>> 'format' = 'json'
>> )
>> """)
>>
>> table.executeInsert("ESAddresses").print()
>>
>> Thanks!
>>
>> On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <Re...@remind101.com> wrote:
>>
>>> Thanks!
>>>
>>> On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <im...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Regarding the performance difference, the proposed way will have one
>>>> more stateful operator (deduplication) than the native 1.11 cdc support.
>>>> The overhead of the deduplication operator is just similar to a simple
>>>> group by aggregate (max on each non-key column).
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Tue, 25 Aug 2020 at 02:21, Rex Fenley <Re...@remind101.com> wrote:
>>>>
>>>>> Thank you so much for the help!
>>>>>
>>>>> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <
>>>>> marta@ververica.com> wrote:
>>>>>
>>>>>> Yes — you'll get the full row in the payload; and you can also access
>>>>>> the change operation, which might be useful in your case.
>>>>>>
>>>>>> About performance, I'm summoning Kurt and @Jark Wu <ja...@apache.org> to
>>>>>> the thread, who will be able to give you a more complete answer and likely
>>>>>> also some optimization tips for your specific use case.
>>>>>>
>>>>>> Marta
>>>>>>
>>>>>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <Re...@remind101.com> wrote:
>>>>>>
>>>>>>> Yup! This definitely helps and makes sense.
>>>>>>>
>>>>>>> The 'after' payload comes with all data from the row right? So
>>>>>>> essentially inserts and updates I can insert/replace data by pk and null
>>>>>>> values I just delete by pk, and then I can build out the rest of my joins
>>>>>>> like normal.
>>>>>>>
>>>>>>> Are there any performance implications of doing it this way that is
>>>>>>> different from the out-of-the-box 1.11 solution?
>>>>>>>
>>>>>>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <
>>>>>>> marta@ververica.com> wrote:
>>>>>>>
>>>>>>>> Hi, Rex.
>>>>>>>>
>>>>>>>> Part of what enabled CDC support in Flink 1.11 was the refactoring
>>>>>>>> of the table source interfaces (FLIP-95 [1]), and the new ScanTableSource
>>>>>>>> [2], which allows to emit bounded/unbounded streams with insert, update and
>>>>>>>> delete rows.
>>>>>>>>
>>>>>>>> In theory, you could consume data generated with Debezium as
>>>>>>>> regular JSON-encoded events before Flink 1.11 — there just wasn't a
>>>>>>>> convenient way to really treat it as "changelog". As a workaround, what you
>>>>>>>> can do in Flink 1.10 is process these messages as JSON and extract the
>>>>>>>> "after" field from the payload, and then apply de-duplication [3] to keep
>>>>>>>> only the last row.
>>>>>>>>
>>>>>>>> The DDL for your source table would look something like:
>>>>>>>>
>>>>>>>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE,
>>>>>>>> `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' =
>>>>>>>> 'json', ... );
>>>>>>>> Hope this helps!
>>>>>>>>
>>>>>>>> Marta
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>>>>>>> [2]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
>>>>>>>> [3]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <
>>>>>>>> chesnay@apache.org> wrote:
>>>>>>>>
>>>>>>>>> @Jark Would it be possible to use the 1.11 debezium support in
>>>>>>>>> 1.10?
>>>>>>>>>
>>>>>>>>> On 20/08/2020 19:59, Rex Fenley wrote:
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I'm trying to set up Flink with Debezium CDC Connector on AWS EMR,
>>>>>>>>> however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived
>>>>>>>>> in Flink 1.11.0, from looking at the documentation.
>>>>>>>>>
>>>>>>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>>>>>>>>>
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>>>>>>>>>
>>>>>>>>> I'm wondering what alternative solutions are available for
>>>>>>>>> connecting Debezium to Flink? Is there an open source Debezium connector
>>>>>>>>> that works with Flink 1.10.0? Could I potentially pull the code out for the
>>>>>>>>> 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0
>>>>>>>>> api?
>>>>>>>>>
>>>>>>>>> For context, I plan on doing some fairly complicated long lived
>>>>>>>>> stateful joins / materialization using the Table API over data ingested
>>>>>>>>> from Postgres and possibly MySQL.
>>>>>>>>>
>>>>>>>>> Appreciate any help, thanks!
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>
>>>>>>>
>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Debezium Flink EMR

Posted by Jark Wu <im...@gmail.com>.
Hi,

This is a known issue in 1.11.0, and has been fixed in 1.11.1.


Best,
Jark

On Fri, 28 Aug 2020 at 06:52, Rex Fenley <Re...@remind101.com> wrote:

> Hi again!
>
> I'm tested out locally in docker on Flink 1.11 first to get my bearings
> before downgrading to 1.10 and figuring out how to replace the Debezium
> connector. However, I'm getting the following error
> ```
> Provided trait [BEFORE_AND_AFTER] can't satisfy required trait
> [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
> ```
>
> Any suggestions for me to fix this?
>
> code:
>
> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val blinkStreamSettings =
> EnvironmentSettings
> .newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build()
> val tableEnv = StreamTableEnvironment.create(bsEnv, blinkStreamSettings)
>
> // Table from Debezium mysql example docker:
> //
> +-------------+-------------------------------------+------+-----+---------+----------------+
> // | Field | Type | Null | Key | Default | Extra |
> //
> +-------------+-------------------------------------+------+-----+---------+----------------+
> // | id | int(11) | NO | PRI | NULL | auto_increment |
> // | customer_id | int(11) | NO | MUL | NULL | |
> // | street | varchar(255) | NO | | NULL | |
> // | city | varchar(255) | NO | | NULL | |
> // | state | varchar(255) | NO | | NULL | |
> // | zip | varchar(255) | NO | | NULL | |
> // | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
> //
> +-------------+-------------------------------------+------+-----+---------+----------------+
>
> tableEnv.executeSql("""
> CREATE TABLE topic_addresses (
> -- schema is totally the same to the MySQL "addresses" table
> id INT,
> customer_id INT,
> street STRING,
> city STRING,
> state STRING,
> zip STRING,
> type STRING,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'dbserver1.inventory.addresses',
> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
> 'properties.group.id' = 'testGroup',
> 'format' = 'debezium-json' -- using debezium-json as the format
> )
> """)
>
> val table = tableEnv.from("topic_addresses").select($"*")
>
> // Defining a PK automatically puts it in Upsert mode, which we want.
> // TODO: type should be a keyword, is that acceptable by the DDL?
> tableEnv.executeSql("""
> CREATE TABLE ESAddresses (
> id INT,
> customer_id INT,
> street STRING,
> city STRING,
> state STRING,
> zip STRING,
> type STRING,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'elasticsearch-7',
> 'hosts' = 'http://flink-jdbc-test_graph-elasticsearch_1:9200',
> 'index' = 'flinkaddresses',
> 'format' = 'json'
> )
> """)
>
> table.executeInsert("ESAddresses").print()
>
> Thanks!
>
> On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <Re...@remind101.com> wrote:
>
>> Thanks!
>>
>> On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <im...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Regarding the performance difference, the proposed way will have one
>>> more stateful operator (deduplication) than the native 1.11 cdc support.
>>> The overhead of the deduplication operator is just similar to a simple
>>> group by aggregate (max on each non-key column).
>>>
>>> Best,
>>> Jark
>>>
>>> On Tue, 25 Aug 2020 at 02:21, Rex Fenley <Re...@remind101.com> wrote:
>>>
>>>> Thank you so much for the help!
>>>>
>>>> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <ma...@ververica.com>
>>>> wrote:
>>>>
>>>>> Yes — you'll get the full row in the payload; and you can also access
>>>>> the change operation, which might be useful in your case.
>>>>>
>>>>> About performance, I'm summoning Kurt and @Jark Wu <ja...@apache.org> to
>>>>> the thread, who will be able to give you a more complete answer and likely
>>>>> also some optimization tips for your specific use case.
>>>>>
>>>>> Marta
>>>>>
>>>>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <Re...@remind101.com> wrote:
>>>>>
>>>>>> Yup! This definitely helps and makes sense.
>>>>>>
>>>>>> The 'after' payload comes with all data from the row right? So
>>>>>> essentially inserts and updates I can insert/replace data by pk and null
>>>>>> values I just delete by pk, and then I can build out the rest of my joins
>>>>>> like normal.
>>>>>>
>>>>>> Are there any performance implications of doing it this way that is
>>>>>> different from the out-of-the-box 1.11 solution?
>>>>>>
>>>>>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <
>>>>>> marta@ververica.com> wrote:
>>>>>>
>>>>>>> Hi, Rex.
>>>>>>>
>>>>>>> Part of what enabled CDC support in Flink 1.11 was the refactoring
>>>>>>> of the table source interfaces (FLIP-95 [1]), and the new ScanTableSource
>>>>>>> [2], which allows to emit bounded/unbounded streams with insert, update and
>>>>>>> delete rows.
>>>>>>>
>>>>>>> In theory, you could consume data generated with Debezium as regular
>>>>>>> JSON-encoded events before Flink 1.11 — there just wasn't a convenient way
>>>>>>> to really treat it as "changelog". As a workaround, what you can do in
>>>>>>> Flink 1.10 is process these messages as JSON and extract the "after" field
>>>>>>> from the payload, and then apply de-duplication [3] to keep only the last
>>>>>>> row.
>>>>>>>
>>>>>>> The DDL for your source table would look something like:
>>>>>>>
>>>>>>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE,
>>>>>>> `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' =
>>>>>>> 'json', ... );
>>>>>>> Hope this helps!
>>>>>>>
>>>>>>> Marta
>>>>>>>
>>>>>>> [1]
>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>>>>>> [2]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
>>>>>>> [3]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <
>>>>>>> chesnay@apache.org> wrote:
>>>>>>>
>>>>>>>> @Jark Would it be possible to use the 1.11 debezium support in 1.10?
>>>>>>>>
>>>>>>>> On 20/08/2020 19:59, Rex Fenley wrote:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I'm trying to set up Flink with Debezium CDC Connector on AWS EMR,
>>>>>>>> however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived
>>>>>>>> in Flink 1.11.0, from looking at the documentation.
>>>>>>>>
>>>>>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>>>>>>>>
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>>>>>>>>
>>>>>>>> I'm wondering what alternative solutions are available for
>>>>>>>> connecting Debezium to Flink? Is there an open source Debezium connector
>>>>>>>> that works with Flink 1.10.0? Could I potentially pull the code out for the
>>>>>>>> 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0
>>>>>>>> api?
>>>>>>>>
>>>>>>>> For context, I plan on doing some fairly complicated long lived
>>>>>>>> stateful joins / materialization using the Table API over data ingested
>>>>>>>> from Postgres and possibly MySQL.
>>>>>>>>
>>>>>>>> Appreciate any help, thanks!
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>
>>>>>>>>
>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>
>>>>>>
>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>> <https://www.facebook.com/remindhq>
>>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>

Re: Debezium Flink EMR

Posted by Rex Fenley <Re...@remind101.com>.
Hi again!

I'm tested out locally in docker on Flink 1.11 first to get my bearings
before downgrading to 1.10 and figuring out how to replace the Debezium
connector. However, I'm getting the following error
```
Provided trait [BEFORE_AND_AFTER] can't satisfy required trait
[ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
```

Any suggestions for me to fix this?

code:

val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val blinkStreamSettings =
EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(bsEnv, blinkStreamSettings)

// Table from Debezium mysql example docker:
//
+-------------+-------------------------------------+------+-----+---------+----------------+
// | Field | Type | Null | Key | Default | Extra |
//
+-------------+-------------------------------------+------+-----+---------+----------------+
// | id | int(11) | NO | PRI | NULL | auto_increment |
// | customer_id | int(11) | NO | MUL | NULL | |
// | street | varchar(255) | NO | | NULL | |
// | city | varchar(255) | NO | | NULL | |
// | state | varchar(255) | NO | | NULL | |
// | zip | varchar(255) | NO | | NULL | |
// | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
//
+-------------+-------------------------------------+------+-----+---------+----------------+

tableEnv.executeSql("""
CREATE TABLE topic_addresses (
-- schema is totally the same to the MySQL "addresses" table
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.addresses',
'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json' -- using debezium-json as the format
)
""")

val table = tableEnv.from("topic_addresses").select($"*")

// Defining a PK automatically puts it in Upsert mode, which we want.
// TODO: type should be a keyword, is that acceptable by the DDL?
tableEnv.executeSql("""
CREATE TABLE ESAddresses (
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://flink-jdbc-test_graph-elasticsearch_1:9200',
'index' = 'flinkaddresses',
'format' = 'json'
)
""")

table.executeInsert("ESAddresses").print()

Thanks!

On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <Re...@remind101.com> wrote:

> Thanks!
>
> On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <im...@gmail.com> wrote:
>
>> Hi,
>>
>> Regarding the performance difference, the proposed way will have one more
>> stateful operator (deduplication) than the native 1.11 cdc support.
>> The overhead of the deduplication operator is just similar to a simple
>> group by aggregate (max on each non-key column).
>>
>> Best,
>> Jark
>>
>> On Tue, 25 Aug 2020 at 02:21, Rex Fenley <Re...@remind101.com> wrote:
>>
>>> Thank you so much for the help!
>>>
>>> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <ma...@ververica.com>
>>> wrote:
>>>
>>>> Yes — you'll get the full row in the payload; and you can also access
>>>> the change operation, which might be useful in your case.
>>>>
>>>> About performance, I'm summoning Kurt and @Jark Wu <ja...@apache.org> to
>>>> the thread, who will be able to give you a more complete answer and likely
>>>> also some optimization tips for your specific use case.
>>>>
>>>> Marta
>>>>
>>>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <Re...@remind101.com> wrote:
>>>>
>>>>> Yup! This definitely helps and makes sense.
>>>>>
>>>>> The 'after' payload comes with all data from the row right? So
>>>>> essentially inserts and updates I can insert/replace data by pk and null
>>>>> values I just delete by pk, and then I can build out the rest of my joins
>>>>> like normal.
>>>>>
>>>>> Are there any performance implications of doing it this way that is
>>>>> different from the out-of-the-box 1.11 solution?
>>>>>
>>>>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <
>>>>> marta@ververica.com> wrote:
>>>>>
>>>>>> Hi, Rex.
>>>>>>
>>>>>> Part of what enabled CDC support in Flink 1.11 was the refactoring of
>>>>>> the table source interfaces (FLIP-95 [1]), and the new ScanTableSource
>>>>>> [2], which allows to emit bounded/unbounded streams with insert, update and
>>>>>> delete rows.
>>>>>>
>>>>>> In theory, you could consume data generated with Debezium as regular
>>>>>> JSON-encoded events before Flink 1.11 — there just wasn't a convenient way
>>>>>> to really treat it as "changelog". As a workaround, what you can do in
>>>>>> Flink 1.10 is process these messages as JSON and extract the "after" field
>>>>>> from the payload, and then apply de-duplication [3] to keep only the last
>>>>>> row.
>>>>>>
>>>>>> The DDL for your source table would look something like:
>>>>>>
>>>>>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE,
>>>>>> `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' =
>>>>>> 'json', ... );
>>>>>> Hope this helps!
>>>>>>
>>>>>> Marta
>>>>>>
>>>>>> [1]
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>>>>> [2]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
>>>>>> [3]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication
>>>>>>
>>>>>>
>>>>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <ch...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> @Jark Would it be possible to use the 1.11 debezium support in 1.10?
>>>>>>>
>>>>>>> On 20/08/2020 19:59, Rex Fenley wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I'm trying to set up Flink with Debezium CDC Connector on AWS EMR,
>>>>>>> however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived
>>>>>>> in Flink 1.11.0, from looking at the documentation.
>>>>>>>
>>>>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>>>>>>>
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>>>>>>>
>>>>>>> I'm wondering what alternative solutions are available for
>>>>>>> connecting Debezium to Flink? Is there an open source Debezium connector
>>>>>>> that works with Flink 1.10.0? Could I potentially pull the code out for the
>>>>>>> 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0
>>>>>>> api?
>>>>>>>
>>>>>>> For context, I plan on doing some fairly complicated long lived
>>>>>>> stateful joins / materialization using the Table API over data ingested
>>>>>>> from Postgres and possibly MySQL.
>>>>>>>
>>>>>>> Appreciate any help, thanks!
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>
>>>>>>>
>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Debezium Flink EMR

Posted by Rex Fenley <Re...@remind101.com>.
Thanks!

On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <im...@gmail.com> wrote:

> Hi,
>
> Regarding the performance difference, the proposed way will have one more
> stateful operator (deduplication) than the native 1.11 cdc support.
> The overhead of the deduplication operator is just similar to a simple
> group by aggregate (max on each non-key column).
>
> Best,
> Jark
>
> On Tue, 25 Aug 2020 at 02:21, Rex Fenley <Re...@remind101.com> wrote:
>
>> Thank you so much for the help!
>>
>> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <ma...@ververica.com>
>> wrote:
>>
>>> Yes — you'll get the full row in the payload; and you can also access
>>> the change operation, which might be useful in your case.
>>>
>>> About performance, I'm summoning Kurt and @Jark Wu <ja...@apache.org> to
>>> the thread, who will be able to give you a more complete answer and likely
>>> also some optimization tips for your specific use case.
>>>
>>> Marta
>>>
>>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <Re...@remind101.com> wrote:
>>>
>>>> Yup! This definitely helps and makes sense.
>>>>
>>>> The 'after' payload comes with all data from the row right? So
>>>> essentially inserts and updates I can insert/replace data by pk and null
>>>> values I just delete by pk, and then I can build out the rest of my joins
>>>> like normal.
>>>>
>>>> Are there any performance implications of doing it this way that is
>>>> different from the out-of-the-box 1.11 solution?
>>>>
>>>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <ma...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi, Rex.
>>>>>
>>>>> Part of what enabled CDC support in Flink 1.11 was the refactoring of
>>>>> the table source interfaces (FLIP-95 [1]), and the new ScanTableSource
>>>>> [2], which allows to emit bounded/unbounded streams with insert, update and
>>>>> delete rows.
>>>>>
>>>>> In theory, you could consume data generated with Debezium as regular
>>>>> JSON-encoded events before Flink 1.11 — there just wasn't a convenient way
>>>>> to really treat it as "changelog". As a workaround, what you can do in
>>>>> Flink 1.10 is process these messages as JSON and extract the "after" field
>>>>> from the payload, and then apply de-duplication [3] to keep only the last
>>>>> row.
>>>>>
>>>>> The DDL for your source table would look something like:
>>>>>
>>>>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE, `field2`
>>>>> DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ...
>>>>> );
>>>>> Hope this helps!
>>>>>
>>>>> Marta
>>>>>
>>>>> [1]
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
>>>>> [3]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication
>>>>>
>>>>>
>>>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <ch...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> @Jark Would it be possible to use the 1.11 debezium support in 1.10?
>>>>>>
>>>>>> On 20/08/2020 19:59, Rex Fenley wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I'm trying to set up Flink with Debezium CDC Connector on AWS EMR,
>>>>>> however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived
>>>>>> in Flink 1.11.0, from looking at the documentation.
>>>>>>
>>>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>>>>>>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>>>>>>
>>>>>> I'm wondering what alternative solutions are available for connecting
>>>>>> Debezium to Flink? Is there an open source Debezium connector that works
>>>>>> with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0
>>>>>> Debezium connector and compile it in my project using Flink 1.10.0 api?
>>>>>>
>>>>>> For context, I plan on doing some fairly complicated long lived
>>>>>> stateful joins / materialization using the Table API over data ingested
>>>>>> from Postgres and possibly MySQL.
>>>>>>
>>>>>> Appreciate any help, thanks!
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>
>>>>>>
>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>> <https://www.facebook.com/remindhq>
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Debezium Flink EMR

Posted by Jark Wu <im...@gmail.com>.
Hi,

Regarding the performance difference, the proposed way will have one more
stateful operator (deduplication) than the native 1.11 cdc support.
The overhead of the deduplication operator is just similar to a simple
group by aggregate (max on each non-key column).

Best,
Jark

On Tue, 25 Aug 2020 at 02:21, Rex Fenley <Re...@remind101.com> wrote:

> Thank you so much for the help!
>
> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <ma...@ververica.com>
> wrote:
>
>> Yes — you'll get the full row in the payload; and you can also access the
>> change operation, which might be useful in your case.
>>
>> About performance, I'm summoning Kurt and @Jark Wu <ja...@apache.org> to
>> the thread, who will be able to give you a more complete answer and likely
>> also some optimization tips for your specific use case.
>>
>> Marta
>>
>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <Re...@remind101.com> wrote:
>>
>>> Yup! This definitely helps and makes sense.
>>>
>>> The 'after' payload comes with all data from the row right? So
>>> essentially inserts and updates I can insert/replace data by pk and null
>>> values I just delete by pk, and then I can build out the rest of my joins
>>> like normal.
>>>
>>> Are there any performance implications of doing it this way that is
>>> different from the out-of-the-box 1.11 solution?
>>>
>>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <ma...@ververica.com>
>>> wrote:
>>>
>>>> Hi, Rex.
>>>>
>>>> Part of what enabled CDC support in Flink 1.11 was the refactoring of
>>>> the table source interfaces (FLIP-95 [1]), and the new ScanTableSource
>>>> [2], which allows to emit bounded/unbounded streams with insert, update and
>>>> delete rows.
>>>>
>>>> In theory, you could consume data generated with Debezium as regular
>>>> JSON-encoded events before Flink 1.11 — there just wasn't a convenient way
>>>> to really treat it as "changelog". As a workaround, what you can do in
>>>> Flink 1.10 is process these messages as JSON and extract the "after" field
>>>> from the payload, and then apply de-duplication [3] to keep only the last
>>>> row.
>>>>
>>>> The DDL for your source table would look something like:
>>>>
>>>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE, `field2`
>>>> DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ...
>>>> );
>>>> Hope this helps!
>>>>
>>>> Marta
>>>>
>>>> [1]
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
>>>> [3]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication
>>>>
>>>>
>>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <ch...@apache.org>
>>>> wrote:
>>>>
>>>>> @Jark Would it be possible to use the 1.11 debezium support in 1.10?
>>>>>
>>>>> On 20/08/2020 19:59, Rex Fenley wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I'm trying to set up Flink with Debezium CDC Connector on AWS EMR,
>>>>> however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived
>>>>> in Flink 1.11.0, from looking at the documentation.
>>>>>
>>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>>>>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>>>>>
>>>>> I'm wondering what alternative solutions are available for connecting
>>>>> Debezium to Flink? Is there an open source Debezium connector that works
>>>>> with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0
>>>>> Debezium connector and compile it in my project using Flink 1.10.0 api?
>>>>>
>>>>> For context, I plan on doing some fairly complicated long lived
>>>>> stateful joins / materialization using the Table API over data ingested
>>>>> from Postgres and possibly MySQL.
>>>>>
>>>>> Appreciate any help, thanks!
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>>
>>>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>

Re: Debezium Flink EMR

Posted by Rex Fenley <Re...@remind101.com>.
Thank you so much for the help!

On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <ma...@ververica.com>
wrote:

> Yes — you'll get the full row in the payload; and you can also access the
> change operation, which might be useful in your case.
>
> About performance, I'm summoning Kurt and @Jark Wu <ja...@apache.org> to
> the thread, who will be able to give you a more complete answer and likely
> also some optimization tips for your specific use case.
>
> Marta
>
> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <Re...@remind101.com> wrote:
>
>> Yup! This definitely helps and makes sense.
>>
>> The 'after' payload comes with all data from the row right? So
>> essentially inserts and updates I can insert/replace data by pk and null
>> values I just delete by pk, and then I can build out the rest of my joins
>> like normal.
>>
>> Are there any performance implications of doing it this way that is
>> different from the out-of-the-box 1.11 solution?
>>
>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <ma...@ververica.com>
>> wrote:
>>
>>> Hi, Rex.
>>>
>>> Part of what enabled CDC support in Flink 1.11 was the refactoring of
>>> the table source interfaces (FLIP-95 [1]), and the new ScanTableSource
>>> [2], which allows to emit bounded/unbounded streams with insert, update and
>>> delete rows.
>>>
>>> In theory, you could consume data generated with Debezium as regular
>>> JSON-encoded events before Flink 1.11 — there just wasn't a convenient way
>>> to really treat it as "changelog". As a workaround, what you can do in
>>> Flink 1.10 is process these messages as JSON and extract the "after" field
>>> from the payload, and then apply de-duplication [3] to keep only the last
>>> row.
>>>
>>> The DDL for your source table would look something like:
>>>
>>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE, `field2`
>>> DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... );
>>> Hope this helps!
>>>
>>> Marta
>>>
>>> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication
>>>
>>>
>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <ch...@apache.org>
>>> wrote:
>>>
>>>> @Jark Would it be possible to use the 1.11 debezium support in 1.10?
>>>>
>>>> On 20/08/2020 19:59, Rex Fenley wrote:
>>>>
>>>> Hi,
>>>>
>>>> I'm trying to set up Flink with Debezium CDC Connector on AWS EMR,
>>>> however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived
>>>> in Flink 1.11.0, from looking at the documentation.
>>>>
>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>>>>
>>>> I'm wondering what alternative solutions are available for connecting
>>>> Debezium to Flink? Is there an open source Debezium connector that works
>>>> with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0
>>>> Debezium connector and compile it in my project using Flink 1.10.0 api?
>>>>
>>>> For context, I plan on doing some fairly complicated long lived
>>>> stateful joins / materialization using the Table API over data ingested
>>>> from Postgres and possibly MySQL.
>>>>
>>>> Appreciate any help, thanks!
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>>
>>>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Debezium Flink EMR

Posted by Marta Paes Moreira <ma...@ververica.com>.
Yes — you'll get the full row in the payload; and you can also access the
change operation, which might be useful in your case.

About performance, I'm summoning Kurt and @Jark Wu <ja...@apache.org> to the
thread, who will be able to give you a more complete answer and likely also
some optimization tips for your specific use case.

Marta

On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <Re...@remind101.com> wrote:

> Yup! This definitely helps and makes sense.
>
> The 'after' payload comes with all data from the row right? So essentially
> inserts and updates I can insert/replace data by pk and null values I just
> delete by pk, and then I can build out the rest of my joins like normal.
>
> Are there any performance implications of doing it this way that is
> different from the out-of-the-box 1.11 solution?
>
> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <ma...@ververica.com>
> wrote:
>
>> Hi, Rex.
>>
>> Part of what enabled CDC support in Flink 1.11 was the refactoring of the
>> table source interfaces (FLIP-95 [1]), and the new ScanTableSource
>> [2], which allows to emit bounded/unbounded streams with insert, update and
>> delete rows.
>>
>> In theory, you could consume data generated with Debezium as regular
>> JSON-encoded events before Flink 1.11 — there just wasn't a convenient way
>> to really treat it as "changelog". As a workaround, what you can do in
>> Flink 1.10 is process these messages as JSON and extract the "after" field
>> from the payload, and then apply de-duplication [3] to keep only the last
>> row.
>>
>> The DDL for your source table would look something like:
>>
>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE, `field2`
>> DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... );
>> Hope this helps!
>>
>> Marta
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication
>>
>>
>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <ch...@apache.org>
>> wrote:
>>
>>> @Jark Would it be possible to use the 1.11 debezium support in 1.10?
>>>
>>> On 20/08/2020 19:59, Rex Fenley wrote:
>>>
>>> Hi,
>>>
>>> I'm trying to set up Flink with Debezium CDC Connector on AWS EMR,
>>> however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived
>>> in Flink 1.11.0, from looking at the documentation.
>>>
>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>>>
>>> I'm wondering what alternative solutions are available for connecting
>>> Debezium to Flink? Is there an open source Debezium connector that works
>>> with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0
>>> Debezium connector and compile it in my project using Flink 1.10.0 api?
>>>
>>> For context, I plan on doing some fairly complicated long lived stateful
>>> joins / materialization using the Table API over data ingested from
>>> Postgres and possibly MySQL.
>>>
>>> Appreciate any help, thanks!
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>>
>>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>

Re: Debezium Flink EMR

Posted by Rex Fenley <Re...@remind101.com>.
Yup! This definitely helps and makes sense.

The 'after' payload comes with all data from the row right? So essentially
inserts and updates I can insert/replace data by pk and null values I just
delete by pk, and then I can build out the rest of my joins like normal.

Are there any performance implications of doing it this way that is
different from the out-of-the-box 1.11 solution?

On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <ma...@ververica.com>
wrote:

> Hi, Rex.
>
> Part of what enabled CDC support in Flink 1.11 was the refactoring of the
> table source interfaces (FLIP-95 [1]), and the new ScanTableSource
> [2], which allows to emit bounded/unbounded streams with insert, update and
> delete rows.
>
> In theory, you could consume data generated with Debezium as regular
> JSON-encoded events before Flink 1.11 — there just wasn't a convenient way
> to really treat it as "changelog". As a workaround, what you can do in
> Flink 1.10 is process these messages as JSON and extract the "after" field
> from the payload, and then apply de-duplication [3] to keep only the last
> row.
>
> The DDL for your source table would look something like:
>
> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE, `field2`
> DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... );
> Hope this helps!
>
> Marta
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication
>
>
> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <ch...@apache.org>
> wrote:
>
>> @Jark Would it be possible to use the 1.11 debezium support in 1.10?
>>
>> On 20/08/2020 19:59, Rex Fenley wrote:
>>
>> Hi,
>>
>> I'm trying to set up Flink with Debezium CDC Connector on AWS EMR,
>> however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived
>> in Flink 1.11.0, from looking at the documentation.
>>
>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>>
>> I'm wondering what alternative solutions are available for connecting
>> Debezium to Flink? Is there an open source Debezium connector that works
>> with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0
>> Debezium connector and compile it in my project using Flink 1.10.0 api?
>>
>> For context, I plan on doing some fairly complicated long lived stateful
>> joins / materialization using the Table API over data ingested from
>> Postgres and possibly MySQL.
>>
>> Appreciate any help, thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>>
>>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Debezium Flink EMR

Posted by Marta Paes Moreira <ma...@ververica.com>.
Hi, Rex.

Part of what enabled CDC support in Flink 1.11 was the refactoring of the
table source interfaces (FLIP-95 [1]), and the new ScanTableSource
[2], which allows to emit bounded/unbounded streams with insert, update and
delete rows.

In theory, you could consume data generated with Debezium as regular
JSON-encoded events before Flink 1.11 — there just wasn't a convenient way
to really treat it as "changelog". As a workaround, what you can do in
Flink 1.10 is process these messages as JSON and extract the "after" field
from the payload, and then apply de-duplication [3] to keep only the last
row.

The DDL for your source table would look something like:

CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE, `field2`
DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... );
Hope this helps!

Marta

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication


On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <ch...@apache.org>
wrote:

> @Jark Would it be possible to use the 1.11 debezium support in 1.10?
>
> On 20/08/2020 19:59, Rex Fenley wrote:
>
> Hi,
>
> I'm trying to set up Flink with Debezium CDC Connector on AWS EMR,
> however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived
> in Flink 1.11.0, from looking at the documentation.
>
> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>
> I'm wondering what alternative solutions are available for connecting
> Debezium to Flink? Is there an open source Debezium connector that works
> with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0
> Debezium connector and compile it in my project using Flink 1.10.0 api?
>
> For context, I plan on doing some fairly complicated long lived stateful
> joins / materialization using the Table API over data ingested from
> Postgres and possibly MySQL.
>
> Appreciate any help, thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>
>
>

Re: Debezium Flink EMR

Posted by Chesnay Schepler <ch...@apache.org>.
@Jark Would it be possible to use the 1.11 debezium support in 1.10?

On 20/08/2020 19:59, Rex Fenley wrote:
> Hi,
>
> I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, 
> however, EMR only supports Flink 1.10.0, whereas Debezium Connector 
> arrived in Flink 1.11.0, from looking at the documentation.
>
> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>
> I'm wondering what alternative solutions are available for connecting 
> Debezium to Flink? Is there an open source Debezium connector that 
> works with Flink 1.10.0? Could I potentially pull the code out for the 
> 1.11.0 Debezium connector and compile it in my project using Flink 
> 1.10.0 api?
>
> For context, I plan on doing some fairly complicated long lived 
> stateful joins / materialization using the Table API over data 
> ingested from Postgres and possibly MySQL.
>
> Appreciate any help, thanks!
>
> -- 
>
> Rex Fenley|Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> | 
> FOLLOW US <https://twitter.com/remindhq> | LIKE US 
> <https://www.facebook.com/remindhq>
>