You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Marta Paes Moreira <ma...@ververica.com> on 2020/09/01 00:06:18 UTC

Re: Debezium Flink EMR

Hey, Rex!

This is likely due to the tombstone records that Debezium produces for
DELETE operations (i.e. a record with the same key as the deleted row and a
value of null). These are markers for Kafka to indicate that log compaction
can remove all records for the given key, and the initial implementation of
the debezium-format can't handle them. This issue is already documented
(and solved) in [1].

In the meantime, can you try adding "tombstones.on.delete":false" to the
configuration of your Debezium MySQL connector? Marta
[1] https://issues.apache.org/jira/browse/FLINK-18705

On Tue, Sep 1, 2020 at 1:36 AM Rex Fenley <Re...@remind101.com> wrote:

> Hi, getting so close but ran into another issue:
>
> Flink successfully reads changes from Debezium/Kafka and writes them to
> Elasticsearch, but there's a problem with deletions. When I DELETE a row
> from MySQL the deletion makes it successfully all the way to Elasticsearch
> which is great, but then the taskmanager suddenly dies with a null pointer
> exception. Inserts and Updates do not have the same problem. This seems
> very odd. Any help would be much appreciated. Thanks!
>
> flink-taskmanager_1    | 2020-08-31 23:30:33,684 WARN
>  org.apache.flink.runtime.taskmanager.Task                    [] - Source:
> TableSourceScan(table=[[default_catalog, default_database,
> topic_addresses]], fields=[id, customer_id, street, city, state, zip,
> type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
> fields=[id, customer_id, street, city, state, zip, type]) (1/2)
> (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED.
> flink-taskmanager_1    | java.lang.NullPointerException: null
> flink-taskmanager_1    | at java.lang.String.<init>(String.java:566)
> ~[?:1.8.0_265]
> flink-taskmanager_1    | at
> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
> ~[flink-json-1.11.1.jar:1.11.1]
> flink-taskmanager_1    | at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
> flink-taskmanager_1    | at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
> flink-taskmanager_1    | at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
> flink-taskmanager_1    | at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
> flink-taskmanager_1    | at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> flink-taskmanager_1    | at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> flink-taskmanager_1    | at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> flink-taskmanager_1    | 2020-08-31 23:30:33,720 INFO
>  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing
> task resources for Source: TableSourceScan(table=[[default_catalog,
> default_database, topic_addresses]], fields=[id, customer_id, street, city,
> state, zip, type]) -> Sink:
> Sink(table=[default_catalog.default_database.ESAddresses], fields=[id,
> customer_id, street, city, state, zip, type]) (1/2)
> (2b79917cb528f37fad7f636740d2fdd8).
> flink-taskmanager_1    | 2020-08-31 23:30:33,728 INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] -
> Un-registering task and sending final execution state FAILED to JobManager
> for task Source: TableSourceScan(table=[[default_catalog, default_database,
> topic_addresses]], fields=[id, customer_id, street, city, state, zip,
> type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
> fields=[id, customer_id, street, city, state, zip, type]) (1/2)
> 2b79917cb528f37fad7f636740d2fdd8.
> flink-jobmanager_1     | 2020-08-31 23:30:33,770 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
> TableSourceScan(table=[[default_catalog, default_database,
> topic_addresses]], fields=[id, customer_id, street, city, state, zip,
> type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
> fields=[id, customer_id, street, city, state, zip, type]) (1/2)
> (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED on
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2e246b35.
> flink-jobmanager_1     | java.lang.NullPointerException: null
> flink-jobmanager_1     | at java.lang.String.<init>(String.java:566)
> ~[?:1.8.0_265]
> flink-jobmanager_1     | at
> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
> ~[flink-json-1.11.1.jar:1.11.1]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
> ~[?:?]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> ~[?:?]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> ~[?:?]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> ~[?:?]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> flink-jobmanager_1     | at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>
> On Mon, Aug 31, 2020 at 12:27 PM Rex Fenley <Re...@remind101.com> wrote:
>
>> Ah, my bad, thanks for pointing that out Arvid!
>>
>> On Mon, Aug 31, 2020 at 12:00 PM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Rex,
>>>
>>> you still forgot
>>>
>>> 'debezium-json.schema-include' = true
>>>
>>> Please reread my mail.
>>>
>>>
>>> On Mon, Aug 31, 2020 at 7:55 PM Rex Fenley <Re...@remind101.com> wrote:
>>>
>>>> Thanks for the input, though I've certainly included a schema as is
>>>> reflected earlier in this thread. Including here again
>>>> ...
>>>> tableEnv.executeSql("""
>>>> CREATE TABLE topic_addresses (
>>>> -- schema is totally the same to the MySQL "addresses" table
>>>> id INT,
>>>> customer_id INT,
>>>> street STRING,
>>>> city STRING,
>>>> state STRING,
>>>> zip STRING,
>>>> type STRING,
>>>> PRIMARY KEY (id) NOT ENFORCED
>>>> ) WITH (
>>>> 'connector' = 'kafka',
>>>> 'topic' = 'dbserver1.inventory.addresses',
>>>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
>>>> 'properties.group.id' = 'testGroup',
>>>> 'format' = 'debezium-json' -- using debezium-json as the format
>>>> )
>>>> """)
>>>>
>>>> val table = tableEnv.from("topic_addresses").select($"*")
>>>> ...
>>>>
>>>> On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise <ar...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi Rex,
>>>>>
>>>>> the connector expects a value without a schema, but the message
>>>>> contains a schema. You can tell Flink that the schema is included as
>>>>> written in the documentation [1].
>>>>>
>>>>> CREATE TABLE topic_products (
>>>>>   -- schema is totally the same to the MySQL "products" table
>>>>>   id BIGINT,
>>>>>   name STRING,
>>>>>   description STRING,
>>>>>   weight DECIMAL(10, 2)) WITH (
>>>>>  'connector' = 'kafka',
>>>>>  'topic' = 'products_binlog',
>>>>>  'properties.bootstrap.servers' = 'localhost:9092',
>>>>>  'properties.group.id' = 'testGroup',
>>>>>  'format' = 'debezium-json',
>>>>>  'debezium-json.schema-include' = true)
>>>>>
>>>>> @Jark Wu <im...@gmail.com> , it would be probably good to make the
>>>>> connector more robust and catch these types of misconfigurations.
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#how-to-use-debezium-format
>>>>>
>>>>> On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley <Re...@remind101.com> wrote:
>>>>>
>>>>>> Awesome, so that took me a step further. When running i'm receiving
>>>>>> an error however. FYI, my docker-compose file is based on the Debezium
>>>>>> mysql tutorial which can be found here
>>>>>> https://debezium.io/documentation/reference/1.2/tutorial.html
>>>>>>
>>>>>> Part of the stack trace:
>>>>>>
>>>>>> flink-jobmanager_1     | Caused by: java.io.IOException: Corrupt
>>>>>> Debezium JSON message
>>>>>> '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111
>>>>>> cool street","city":"Big
>>>>>> City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'.
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
>>>>>> ~[flink-json-1.11.1.jar:1.11.1]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>>>>>> ~[?:?]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>>>>>> ~[?:?]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>>>>>> ~[?:?]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>>>>> ~[?:?]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>> flink-jobmanager_1     | Caused by: java.lang.NullPointerException
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115)
>>>>>> ~[flink-json-1.11.1.jar:1.11.1]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>>>>>> ~[?:?]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>>>>>> ~[?:?]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>>>>>> ~[?:?]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>>>>> ~[?:?]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>> flink-jobmanager_1     | at
>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>>
>>>>>> On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <im...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> This is a known issue in 1.11.0, and has been fixed in 1.11.1.
>>>>>>>
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>> On Fri, 28 Aug 2020 at 06:52, Rex Fenley <Re...@remind101.com> wrote:
>>>>>>>
>>>>>>>> Hi again!
>>>>>>>>
>>>>>>>> I'm tested out locally in docker on Flink 1.11 first to get my
>>>>>>>> bearings before downgrading to 1.10 and figuring out how to replace the
>>>>>>>> Debezium connector. However, I'm getting the following error
>>>>>>>> ```
>>>>>>>> Provided trait [BEFORE_AND_AFTER] can't satisfy required trait
>>>>>>>> [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
>>>>>>>> ```
>>>>>>>>
>>>>>>>> Any suggestions for me to fix this?
>>>>>>>>
>>>>>>>> code:
>>>>>>>>
>>>>>>>> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>> val blinkStreamSettings =
>>>>>>>> EnvironmentSettings
>>>>>>>> .newInstance()
>>>>>>>> .useBlinkPlanner()
>>>>>>>> .inStreamingMode()
>>>>>>>> .build()
>>>>>>>> val tableEnv = StreamTableEnvironment.create(bsEnv,
>>>>>>>> blinkStreamSettings)
>>>>>>>>
>>>>>>>> // Table from Debezium mysql example docker:
>>>>>>>> //
>>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>>>>> // | Field | Type | Null | Key | Default | Extra |
>>>>>>>> //
>>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>>>>> // | id | int(11) | NO | PRI | NULL | auto_increment |
>>>>>>>> // | customer_id | int(11) | NO | MUL | NULL | |
>>>>>>>> // | street | varchar(255) | NO | | NULL | |
>>>>>>>> // | city | varchar(255) | NO | | NULL | |
>>>>>>>> // | state | varchar(255) | NO | | NULL | |
>>>>>>>> // | zip | varchar(255) | NO | | NULL | |
>>>>>>>> // | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
>>>>>>>> //
>>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>>>>>
>>>>>>>> tableEnv.executeSql("""
>>>>>>>> CREATE TABLE topic_addresses (
>>>>>>>> -- schema is totally the same to the MySQL "addresses" table
>>>>>>>> id INT,
>>>>>>>> customer_id INT,
>>>>>>>> street STRING,
>>>>>>>> city STRING,
>>>>>>>> state STRING,
>>>>>>>> zip STRING,
>>>>>>>> type STRING,
>>>>>>>> PRIMARY KEY (id) NOT ENFORCED
>>>>>>>> ) WITH (
>>>>>>>> 'connector' = 'kafka',
>>>>>>>> 'topic' = 'dbserver1.inventory.addresses',
>>>>>>>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
>>>>>>>> 'properties.group.id' = 'testGroup',
>>>>>>>> 'format' = 'debezium-json' -- using debezium-json as the format
>>>>>>>> )
>>>>>>>> """)
>>>>>>>>
>>>>>>>> val table = tableEnv.from("topic_addresses").select($"*")
>>>>>>>>
>>>>>>>> // Defining a PK automatically puts it in Upsert mode, which we
>>>>>>>> want.
>>>>>>>> // TODO: type should be a keyword, is that acceptable by the DDL?
>>>>>>>> tableEnv.executeSql("""
>>>>>>>> CREATE TABLE ESAddresses (
>>>>>>>> id INT,
>>>>>>>> customer_id INT,
>>>>>>>> street STRING,
>>>>>>>> city STRING,
>>>>>>>> state STRING,
>>>>>>>> zip STRING,
>>>>>>>> type STRING,
>>>>>>>> PRIMARY KEY (id) NOT ENFORCED
>>>>>>>> ) WITH (
>>>>>>>> 'connector' = 'elasticsearch-7',
>>>>>>>> 'hosts' = 'http://flink-jdbc-test_graph-elasticsearch_1:9200',
>>>>>>>> 'index' = 'flinkaddresses',
>>>>>>>> 'format' = 'json'
>>>>>>>> )
>>>>>>>> """)
>>>>>>>>
>>>>>>>> table.executeInsert("ESAddresses").print()
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>> On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <Re...@remind101.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>> On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <im...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Regarding the performance difference, the proposed way will have
>>>>>>>>>> one more stateful operator (deduplication) than the native 1.11 cdc
>>>>>>>>>> support.
>>>>>>>>>> The overhead of the deduplication operator is just similar to a
>>>>>>>>>> simple group by aggregate (max on each non-key column).
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Jark
>>>>>>>>>>
>>>>>>>>>> On Tue, 25 Aug 2020 at 02:21, Rex Fenley <Re...@remind101.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thank you so much for the help!
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <
>>>>>>>>>>> marta@ververica.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Yes — you'll get the full row in the payload; and you can also
>>>>>>>>>>>> access the change operation, which might be useful in your case.
>>>>>>>>>>>>
>>>>>>>>>>>> About performance, I'm summoning Kurt and @Jark Wu
>>>>>>>>>>>> <ja...@apache.org> to the thread, who will be able to give you
>>>>>>>>>>>> a more complete answer and likely also some optimization tips for your
>>>>>>>>>>>> specific use case.
>>>>>>>>>>>>
>>>>>>>>>>>> Marta
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <Re...@remind101.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Yup! This definitely helps and makes sense.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The 'after' payload comes with all data from the row right? So
>>>>>>>>>>>>> essentially inserts and updates I can insert/replace data by pk and null
>>>>>>>>>>>>> values I just delete by pk, and then I can build out the rest of my joins
>>>>>>>>>>>>> like normal.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Are there any performance implications of doing it this way
>>>>>>>>>>>>> that is different from the out-of-the-box 1.11 solution?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <
>>>>>>>>>>>>> marta@ververica.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi, Rex.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Part of what enabled CDC support in Flink 1.11 was the
>>>>>>>>>>>>>> refactoring of the table source interfaces (FLIP-95 [1]), and the new
>>>>>>>>>>>>>> ScanTableSource [2], which allows to emit bounded/unbounded streams with
>>>>>>>>>>>>>> insert, update and delete rows.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In theory, you could consume data generated with Debezium as
>>>>>>>>>>>>>> regular JSON-encoded events before Flink 1.11 — there just wasn't a
>>>>>>>>>>>>>> convenient way to really treat it as "changelog". As a workaround, what you
>>>>>>>>>>>>>> can do in Flink 1.10 is process these messages as JSON and extract the
>>>>>>>>>>>>>> "after" field from the payload, and then apply de-duplication [3] to keep
>>>>>>>>>>>>>> only the last row.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The DDL for your source table would look something like:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE,
>>>>>>>>>>>>>> `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka',
>>>>>>>>>>>>>> 'format' = 'json', ... );
>>>>>>>>>>>>>> Hope this helps!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Marta
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
>>>>>>>>>>>>>> [3]
>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <
>>>>>>>>>>>>>> chesnay@apache.org> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> @Jark Would it be possible to use the 1.11 debezium support
>>>>>>>>>>>>>>> in 1.10?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 20/08/2020 19:59, Rex Fenley wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I'm trying to set up Flink with Debezium CDC Connector on
>>>>>>>>>>>>>>> AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium
>>>>>>>>>>>>>>> Connector arrived in Flink 1.11.0, from looking at the documentation.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I'm wondering what alternative solutions are available for
>>>>>>>>>>>>>>> connecting Debezium to Flink? Is there an open source Debezium connector
>>>>>>>>>>>>>>> that works with Flink 1.10.0? Could I potentially pull the code out for the
>>>>>>>>>>>>>>> 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0
>>>>>>>>>>>>>>> api?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For context, I plan on doing some fairly complicated long
>>>>>>>>>>>>>>> lived stateful joins / materialization using the Table API over data
>>>>>>>>>>>>>>> ingested from Postgres and possibly MySQL.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Appreciate any help, thanks!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>>
>>>>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>>
>>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>
>>>>>>>>
>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>
>>>>>>
>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>> <https://www.facebook.com/remindhq>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Arvid Heise | Senior Java Developer
>>>>>
>>>>> <https://www.ververica.com/>
>>>>>
>>>>> Follow us @VervericaData
>>>>>
>>>>> --
>>>>>
>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>> Conference
>>>>>
>>>>> Stream Processing | Event Driven | Real Time
>>>>>
>>>>> --
>>>>>
>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>
>>>>> --
>>>>> Ververica GmbH
>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>>>> Ji (Toni) Cheng
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>

Re: Debezium Flink EMR

Posted by Rex Fenley <Re...@remind101.com>.
This worked, thanks! Looking forward to the future releases :)

On Mon, Aug 31, 2020 at 5:06 PM Marta Paes Moreira <ma...@ververica.com>
wrote:

> Hey, Rex!
>
> This is likely due to the tombstone records that Debezium produces for
> DELETE operations (i.e. a record with the same key as the deleted row and a
> value of null). These are markers for Kafka to indicate that log
> compaction can remove all records for the given key, and the initial
> implementation of the debezium-format can't handle them. This issue is
> already documented (and solved) in [1].
>
> In the meantime, can you try adding "tombstones.on.delete":false" to the
> configuration of your Debezium MySQL connector? Marta
> [1] https://issues.apache.org/jira/browse/FLINK-18705
>
> On Tue, Sep 1, 2020 at 1:36 AM Rex Fenley <Re...@remind101.com> wrote:
>
>> Hi, getting so close but ran into another issue:
>>
>> Flink successfully reads changes from Debezium/Kafka and writes them to
>> Elasticsearch, but there's a problem with deletions. When I DELETE a row
>> from MySQL the deletion makes it successfully all the way to Elasticsearch
>> which is great, but then the taskmanager suddenly dies with a null pointer
>> exception. Inserts and Updates do not have the same problem. This seems
>> very odd. Any help would be much appreciated. Thanks!
>>
>> flink-taskmanager_1    | 2020-08-31 23:30:33,684 WARN
>>  org.apache.flink.runtime.taskmanager.Task                    [] - Source:
>> TableSourceScan(table=[[default_catalog, default_database,
>> topic_addresses]], fields=[id, customer_id, street, city, state, zip,
>> type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
>> fields=[id, customer_id, street, city, state, zip, type]) (1/2)
>> (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED.
>> flink-taskmanager_1    | java.lang.NullPointerException: null
>> flink-taskmanager_1    | at java.lang.String.<init>(String.java:566)
>> ~[?:1.8.0_265]
>> flink-taskmanager_1    | at
>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
>> ~[flink-json-1.11.1.jar:1.11.1]
>> flink-taskmanager_1    | at
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
>> flink-taskmanager_1    | at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
>> flink-taskmanager_1    | at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
>> flink-taskmanager_1    | at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
>> flink-taskmanager_1    | at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> flink-taskmanager_1    | at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> flink-taskmanager_1    | at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> flink-taskmanager_1    | 2020-08-31 23:30:33,720 INFO
>>  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing
>> task resources for Source: TableSourceScan(table=[[default_catalog,
>> default_database, topic_addresses]], fields=[id, customer_id, street, city,
>> state, zip, type]) -> Sink:
>> Sink(table=[default_catalog.default_database.ESAddresses], fields=[id,
>> customer_id, street, city, state, zip, type]) (1/2)
>> (2b79917cb528f37fad7f636740d2fdd8).
>> flink-taskmanager_1    | 2020-08-31 23:30:33,728 INFO
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] -
>> Un-registering task and sending final execution state FAILED to JobManager
>> for task Source: TableSourceScan(table=[[default_catalog, default_database,
>> topic_addresses]], fields=[id, customer_id, street, city, state, zip,
>> type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
>> fields=[id, customer_id, street, city, state, zip, type]) (1/2)
>> 2b79917cb528f37fad7f636740d2fdd8.
>> flink-jobmanager_1     | 2020-08-31 23:30:33,770 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
>> TableSourceScan(table=[[default_catalog, default_database,
>> topic_addresses]], fields=[id, customer_id, street, city, state, zip,
>> type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
>> fields=[id, customer_id, street, city, state, zip, type]) (1/2)
>> (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED on
>> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2e246b35.
>> flink-jobmanager_1     | java.lang.NullPointerException: null
>> flink-jobmanager_1     | at java.lang.String.<init>(String.java:566)
>> ~[?:1.8.0_265]
>> flink-jobmanager_1     | at
>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
>> ~[flink-json-1.11.1.jar:1.11.1]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>> ~[?:?]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>> ~[?:?]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>> ~[?:?]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>> ~[?:?]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> flink-jobmanager_1     | at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>
>> On Mon, Aug 31, 2020 at 12:27 PM Rex Fenley <Re...@remind101.com> wrote:
>>
>>> Ah, my bad, thanks for pointing that out Arvid!
>>>
>>> On Mon, Aug 31, 2020 at 12:00 PM Arvid Heise <ar...@ververica.com>
>>> wrote:
>>>
>>>> Hi Rex,
>>>>
>>>> you still forgot
>>>>
>>>> 'debezium-json.schema-include' = true
>>>>
>>>> Please reread my mail.
>>>>
>>>>
>>>> On Mon, Aug 31, 2020 at 7:55 PM Rex Fenley <Re...@remind101.com> wrote:
>>>>
>>>>> Thanks for the input, though I've certainly included a schema as is
>>>>> reflected earlier in this thread. Including here again
>>>>> ...
>>>>> tableEnv.executeSql("""
>>>>> CREATE TABLE topic_addresses (
>>>>> -- schema is totally the same to the MySQL "addresses" table
>>>>> id INT,
>>>>> customer_id INT,
>>>>> street STRING,
>>>>> city STRING,
>>>>> state STRING,
>>>>> zip STRING,
>>>>> type STRING,
>>>>> PRIMARY KEY (id) NOT ENFORCED
>>>>> ) WITH (
>>>>> 'connector' = 'kafka',
>>>>> 'topic' = 'dbserver1.inventory.addresses',
>>>>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
>>>>> 'properties.group.id' = 'testGroup',
>>>>> 'format' = 'debezium-json' -- using debezium-json as the format
>>>>> )
>>>>> """)
>>>>>
>>>>> val table = tableEnv.from("topic_addresses").select($"*")
>>>>> ...
>>>>>
>>>>> On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise <ar...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Rex,
>>>>>>
>>>>>> the connector expects a value without a schema, but the message
>>>>>> contains a schema. You can tell Flink that the schema is included as
>>>>>> written in the documentation [1].
>>>>>>
>>>>>> CREATE TABLE topic_products (
>>>>>>   -- schema is totally the same to the MySQL "products" table
>>>>>>   id BIGINT,
>>>>>>   name STRING,
>>>>>>   description STRING,
>>>>>>   weight DECIMAL(10, 2)) WITH (
>>>>>>  'connector' = 'kafka',
>>>>>>  'topic' = 'products_binlog',
>>>>>>  'properties.bootstrap.servers' = 'localhost:9092',
>>>>>>  'properties.group.id' = 'testGroup',
>>>>>>  'format' = 'debezium-json',
>>>>>>  'debezium-json.schema-include' = true)
>>>>>>
>>>>>> @Jark Wu <im...@gmail.com> , it would be probably good to make the
>>>>>> connector more robust and catch these types of misconfigurations.
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#how-to-use-debezium-format
>>>>>>
>>>>>> On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley <Re...@remind101.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Awesome, so that took me a step further. When running i'm receiving
>>>>>>> an error however. FYI, my docker-compose file is based on the Debezium
>>>>>>> mysql tutorial which can be found here
>>>>>>> https://debezium.io/documentation/reference/1.2/tutorial.html
>>>>>>>
>>>>>>> Part of the stack trace:
>>>>>>>
>>>>>>> flink-jobmanager_1     | Caused by: java.io.IOException: Corrupt
>>>>>>> Debezium JSON message
>>>>>>> '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111
>>>>>>> cool street","city":"Big
>>>>>>> City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'.
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
>>>>>>> ~[flink-json-1.11.1.jar:1.11.1]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>>>>>>> ~[?:?]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>>>>>>> ~[?:?]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>>>>>>> ~[?:?]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>>>>>> ~[?:?]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>>> flink-jobmanager_1     | Caused by: java.lang.NullPointerException
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115)
>>>>>>> ~[flink-json-1.11.1.jar:1.11.1]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>>>>>>> ~[?:?]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>>>>>>> ~[?:?]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>>>>>>> ~[?:?]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>>>>>> ~[?:?]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>>> flink-jobmanager_1     | at
>>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>>>
>>>>>>> On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <im...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> This is a known issue in 1.11.0, and has been fixed in 1.11.1.
>>>>>>>>
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jark
>>>>>>>>
>>>>>>>> On Fri, 28 Aug 2020 at 06:52, Rex Fenley <Re...@remind101.com> wrote:
>>>>>>>>
>>>>>>>>> Hi again!
>>>>>>>>>
>>>>>>>>> I'm tested out locally in docker on Flink 1.11 first to get my
>>>>>>>>> bearings before downgrading to 1.10 and figuring out how to replace the
>>>>>>>>> Debezium connector. However, I'm getting the following error
>>>>>>>>> ```
>>>>>>>>> Provided trait [BEFORE_AND_AFTER] can't satisfy required trait
>>>>>>>>> [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> Any suggestions for me to fix this?
>>>>>>>>>
>>>>>>>>> code:
>>>>>>>>>
>>>>>>>>> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>> val blinkStreamSettings =
>>>>>>>>> EnvironmentSettings
>>>>>>>>> .newInstance()
>>>>>>>>> .useBlinkPlanner()
>>>>>>>>> .inStreamingMode()
>>>>>>>>> .build()
>>>>>>>>> val tableEnv = StreamTableEnvironment.create(bsEnv,
>>>>>>>>> blinkStreamSettings)
>>>>>>>>>
>>>>>>>>> // Table from Debezium mysql example docker:
>>>>>>>>> //
>>>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>>>>>> // | Field | Type | Null | Key | Default | Extra |
>>>>>>>>> //
>>>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>>>>>> // | id | int(11) | NO | PRI | NULL | auto_increment |
>>>>>>>>> // | customer_id | int(11) | NO | MUL | NULL | |
>>>>>>>>> // | street | varchar(255) | NO | | NULL | |
>>>>>>>>> // | city | varchar(255) | NO | | NULL | |
>>>>>>>>> // | state | varchar(255) | NO | | NULL | |
>>>>>>>>> // | zip | varchar(255) | NO | | NULL | |
>>>>>>>>> // | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
>>>>>>>>> //
>>>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>>>>>>
>>>>>>>>> tableEnv.executeSql("""
>>>>>>>>> CREATE TABLE topic_addresses (
>>>>>>>>> -- schema is totally the same to the MySQL "addresses" table
>>>>>>>>> id INT,
>>>>>>>>> customer_id INT,
>>>>>>>>> street STRING,
>>>>>>>>> city STRING,
>>>>>>>>> state STRING,
>>>>>>>>> zip STRING,
>>>>>>>>> type STRING,
>>>>>>>>> PRIMARY KEY (id) NOT ENFORCED
>>>>>>>>> ) WITH (
>>>>>>>>> 'connector' = 'kafka',
>>>>>>>>> 'topic' = 'dbserver1.inventory.addresses',
>>>>>>>>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
>>>>>>>>> 'properties.group.id' = 'testGroup',
>>>>>>>>> 'format' = 'debezium-json' -- using debezium-json as the format
>>>>>>>>> )
>>>>>>>>> """)
>>>>>>>>>
>>>>>>>>> val table = tableEnv.from("topic_addresses").select($"*")
>>>>>>>>>
>>>>>>>>> // Defining a PK automatically puts it in Upsert mode, which we
>>>>>>>>> want.
>>>>>>>>> // TODO: type should be a keyword, is that acceptable by the DDL?
>>>>>>>>> tableEnv.executeSql("""
>>>>>>>>> CREATE TABLE ESAddresses (
>>>>>>>>> id INT,
>>>>>>>>> customer_id INT,
>>>>>>>>> street STRING,
>>>>>>>>> city STRING,
>>>>>>>>> state STRING,
>>>>>>>>> zip STRING,
>>>>>>>>> type STRING,
>>>>>>>>> PRIMARY KEY (id) NOT ENFORCED
>>>>>>>>> ) WITH (
>>>>>>>>> 'connector' = 'elasticsearch-7',
>>>>>>>>> 'hosts' = 'http://flink-jdbc-test_graph-elasticsearch_1:9200',
>>>>>>>>> 'index' = 'flinkaddresses',
>>>>>>>>> 'format' = 'json'
>>>>>>>>> )
>>>>>>>>> """)
>>>>>>>>>
>>>>>>>>> table.executeInsert("ESAddresses").print()
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>> On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <Re...@remind101.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks!
>>>>>>>>>>
>>>>>>>>>> On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <im...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Regarding the performance difference, the proposed way will have
>>>>>>>>>>> one more stateful operator (deduplication) than the native 1.11 cdc
>>>>>>>>>>> support.
>>>>>>>>>>> The overhead of the deduplication operator is just similar to a
>>>>>>>>>>> simple group by aggregate (max on each non-key column).
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Jark
>>>>>>>>>>>
>>>>>>>>>>> On Tue, 25 Aug 2020 at 02:21, Rex Fenley <Re...@remind101.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thank you so much for the help!
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <
>>>>>>>>>>>> marta@ververica.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Yes — you'll get the full row in the payload; and you can also
>>>>>>>>>>>>> access the change operation, which might be useful in your case.
>>>>>>>>>>>>>
>>>>>>>>>>>>> About performance, I'm summoning Kurt and @Jark Wu
>>>>>>>>>>>>> <ja...@apache.org> to the thread, who will be able to give you
>>>>>>>>>>>>> a more complete answer and likely also some optimization tips for your
>>>>>>>>>>>>> specific use case.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Marta
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <Re...@remind101.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Yup! This definitely helps and makes sense.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The 'after' payload comes with all data from the row right?
>>>>>>>>>>>>>> So essentially inserts and updates I can insert/replace data by pk and null
>>>>>>>>>>>>>> values I just delete by pk, and then I can build out the rest of my joins
>>>>>>>>>>>>>> like normal.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Are there any performance implications of doing it this way
>>>>>>>>>>>>>> that is different from the out-of-the-box 1.11 solution?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <
>>>>>>>>>>>>>> marta@ververica.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi, Rex.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Part of what enabled CDC support in Flink 1.11 was the
>>>>>>>>>>>>>>> refactoring of the table source interfaces (FLIP-95 [1]), and the new
>>>>>>>>>>>>>>> ScanTableSource [2], which allows to emit bounded/unbounded streams with
>>>>>>>>>>>>>>> insert, update and delete rows.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In theory, you could consume data generated with Debezium as
>>>>>>>>>>>>>>> regular JSON-encoded events before Flink 1.11 — there just wasn't a
>>>>>>>>>>>>>>> convenient way to really treat it as "changelog". As a workaround, what you
>>>>>>>>>>>>>>> can do in Flink 1.10 is process these messages as JSON and extract the
>>>>>>>>>>>>>>> "after" field from the payload, and then apply de-duplication [3] to keep
>>>>>>>>>>>>>>> only the last row.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The DDL for your source table would look something like:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> CREATE TABLE tablename ( *... * after ROW(`field1`
>>>>>>>>>>>>>>> DATATYPE, `field2` DATATYPE, ...) ) WITH ( 'connector' =
>>>>>>>>>>>>>>> 'kafka', 'format' = 'json', ... );
>>>>>>>>>>>>>>> Hope this helps!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Marta
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
>>>>>>>>>>>>>>> [3]
>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <
>>>>>>>>>>>>>>> chesnay@apache.org> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> @Jark Would it be possible to use the 1.11 debezium support
>>>>>>>>>>>>>>>> in 1.10?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On 20/08/2020 19:59, Rex Fenley wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I'm trying to set up Flink with Debezium CDC Connector on
>>>>>>>>>>>>>>>> AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium
>>>>>>>>>>>>>>>> Connector arrived in Flink 1.11.0, from looking at the documentation.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I'm wondering what alternative solutions are available for
>>>>>>>>>>>>>>>> connecting Debezium to Flink? Is there an open source Debezium connector
>>>>>>>>>>>>>>>> that works with Flink 1.10.0? Could I potentially pull the code out for the
>>>>>>>>>>>>>>>> 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0
>>>>>>>>>>>>>>>> api?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For context, I plan on doing some fairly complicated long
>>>>>>>>>>>>>>>> lived stateful joins / materialization using the Table API over data
>>>>>>>>>>>>>>>> ingested from Postgres and possibly MySQL.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Appreciate any help, thanks!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>>
>>>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>
>>>>>>>
>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Arvid Heise | Senior Java Developer
>>>>>>
>>>>>> <https://www.ververica.com/>
>>>>>>
>>>>>> Follow us @VervericaData
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>>> Conference
>>>>>>
>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>>
>>>>>> --
>>>>>> Ververica GmbH
>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>>>>> Ji (Toni) Cheng
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Arvid Heise | Senior Java Developer
>>>>
>>>> <https://www.ververica.com/>
>>>>
>>>> Follow us @VervericaData
>>>>
>>>> --
>>>>
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>> Conference
>>>>
>>>> Stream Processing | Event Driven | Real Time
>>>>
>>>> --
>>>>
>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>
>>>> --
>>>> Ververica GmbH
>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>>> (Toni) Cheng
>>>>
>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>