You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Youzha <yu...@gmail.com> on 2020/11/16 15:20:24 UTC
left join flink stream
Hi i want to do join reference between kafka with mysql table reference.
how can i do this thing with flink stream. does coGroup function can handle
this ? or anyone have java sample code with this case? i’ve read some
article that said if cogroup function can do left outer join. but i’m still
confuse to implement it because i just learned flink stream.
need advice pls.
Re: left join flink stream
Posted by Guowei Ma <gu...@gmail.com>.
Hi, Youzha
Sorry for the late reply. It seems that the type is mis-type-match.
Could you
1. tableA.printSchema to print the schema?
2. KafkaSource.getType() to print the typeinformation?
Best,
Guowei
On Mon, Nov 23, 2020 at 5:28 PM Youzha <yu...@gmail.com> wrote:
> Hi, this is sample code :
>
>
> Table tableA = tEnv.fromDataStream(KafkaSource,"timestamp, id, status");
>
> tEnv.registerTable("tbl_kafka", tableA);
>
> Table result = tEnv.sqlQuery("select * from tbl_kafka where id = 'E02'");
>
>
> fyi, i’m using avro format on my KafkaSource
>
>
> Best Regards,
>
> .
>
>
>
>
> On Mon, 23 Nov 2020 at 14.44 Guowei Ma <gu...@gmail.com> wrote:
>
>> Could you share your code?
>> Best,
>> Guowei
>>
>>
>> On Mon, Nov 23, 2020 at 12:05 PM tkg_cangkul <yu...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> i'm using java for do this thing.
>>> and i've success to register the tables.
>>>
>>> i've success to select each table.
>>>
>>> Table result1 = tEnv.sqlQuery("select status_code from table_kafka");
>>> Table result2 = tEnv.sqlQuery("select status_code from
>>> table_mysql_reff");
>>>
>>> but when i try join query i've some error msg like this :
>>>
>>> Caused by: org.apache.flink.table.api.TableException: Generic RAW types
>>> must have a common type information. at
>>> org.apache.flink.table.planner.calcite.FlinkTypeFactory.resolveAllIdenticalTypes(FlinkTypeFactory.scala:381)
>>>
>>>
>>> is there any somethine that i missed here?
>>>
>>> On 23/11/20 08:43, Guowei Ma wrote:
>>>
>>> Hi
>>> One way would look like as following
>>> 1. create the probe table from Kafka as following. You could find more
>>> detailed information from doc[1]
>>>
>>> CREATE TABLE myTopic (
>>> id BIGINT,
>>> item_id BIGINT,
>>> category_id BIGINT,
>>> behavior STRING,
>>> ts TIMESTAMP(3)) WITH (
>>> 'connector' = 'kafka',
>>> 'topic' = 'user_behavior',
>>> 'properties.bootstrap.servers' = 'localhost:9092',
>>> 'properties.group.id' = 'testGroup',
>>> 'format' = 'csv',
>>> 'scan.startup.mode' = 'earliest-offset')
>>>
>>> 2. create the build table from mysql as following. You could find more
>>> detailed information from doc[2]
>>>
>>> CREATE TABLE MyUserTable (
>>> id BIGINT,
>>> sex STRING,
>>> age INT,
>>> status BOOLEAN,
>>> PRIMARY KEY (id) NOT ENFORCED) WITH (
>>> 'connector' = 'jdbc',
>>> 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
>>> 'table-name' = 'users');
>>>
>>> 3. join the tables as following. You could find more detailed
>>> information from doc[3]
>>>
>>> -- temporal join the JDBC table as a dimension tableSELECT * FROM myTopicLEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctimeON myTopic.key = MyUserTable.id;
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Wed, Nov 18, 2020 at 3:05 PM tkg_cangkul <yu...@gmail.com>
>>> wrote:
>>>
>>>> Hi Guowei Ma,
>>>>
>>>> Thanks for your reply,
>>>> In my case.
>>>> I've some data on my kafka topic. and i want to get the detail of the
>>>> data from my reference mysql table.
>>>> for example :
>>>>
>>>> in my kafka topic i've this fields :
>>>>
>>>> id, name, position, experience
>>>>
>>>> in my reference mysql table i've this fields:
>>>>
>>>> id, name, age, sex
>>>>
>>>> So , i want to do left join to get the detail data from my reference
>>>> table.
>>>>
>>>> How can i do this with flink?
>>>> Pls advice
>>>>
>>>> On 17/11/20 07:46, Guowei Ma wrote:
>>>>
>>>> Hi, Youzha
>>>>
>>>> In general `CoGroup` is for the window based operation. How it could
>>>> satisfy your requirements depends on your specific scenario. But if you
>>>> want to look at the mysql table as a dimension table. There might be other
>>>> two ways:
>>>> 1. Using Table/Sql SDK. You could find a sql example(temporal join the
>>>> JDBC table as a dimension table) in the table jdbc connector [1] and more
>>>> join information in the [2]
>>>> 2. Using DataStream SDK. Maybe you could see whether the `AsycIO`
>>>> function could satisfy your requirements. You could find the example in
>>>> [3].
>>>>
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
>>>> [3]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html
>>>>
>>>> Best,
>>>> Guowei
>>>>
>>>>
>>>> On Mon, Nov 16, 2020 at 11:20 PM Youzha <yu...@gmail.com> wrote:
>>>>
>>>>> Hi i want to do join reference between kafka with mysql table
>>>>> reference. how can i do this thing with flink stream. does coGroup function
>>>>> can handle this ? or anyone have java sample code with this case? i’ve read
>>>>> some article that said if cogroup function can do left outer join. but i’m
>>>>> still confuse to implement it because i just learned flink stream.
>>>>>
>>>>>
>>>>> need advice pls.
>>>>>
>>>>
>>>>
>>>
Re: left join flink stream
Posted by Guowei Ma <gu...@gmail.com>.
Could you share your code?
Best,
Guowei
On Mon, Nov 23, 2020 at 12:05 PM tkg_cangkul <yu...@gmail.com> wrote:
> Hi,
>
> i'm using java for do this thing.
> and i've success to register the tables.
>
> i've success to select each table.
>
> Table result1 = tEnv.sqlQuery("select status_code from table_kafka");
> Table result2 = tEnv.sqlQuery("select status_code from table_mysql_reff");
>
> but when i try join query i've some error msg like this :
>
> Caused by: org.apache.flink.table.api.TableException: Generic RAW types
> must have a common type information. at
> org.apache.flink.table.planner.calcite.FlinkTypeFactory.resolveAllIdenticalTypes(FlinkTypeFactory.scala:381)
>
>
> is there any somethine that i missed here?
>
> On 23/11/20 08:43, Guowei Ma wrote:
>
> Hi
> One way would look like as following
> 1. create the probe table from Kafka as following. You could find more
> detailed information from doc[1]
>
> CREATE TABLE myTopic (
> id BIGINT,
> item_id BIGINT,
> category_id BIGINT,
> behavior STRING,
> ts TIMESTAMP(3)) WITH (
> 'connector' = 'kafka',
> 'topic' = 'user_behavior',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'properties.group.id' = 'testGroup',
> 'format' = 'csv',
> 'scan.startup.mode' = 'earliest-offset')
>
> 2. create the build table from mysql as following. You could find more
> detailed information from doc[2]
>
> CREATE TABLE MyUserTable (
> id BIGINT,
> sex STRING,
> age INT,
> status BOOLEAN,
> PRIMARY KEY (id) NOT ENFORCED) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
> 'table-name' = 'users');
>
> 3. join the tables as following. You could find more detailed information
> from doc[3]
>
> -- temporal join the JDBC table as a dimension tableSELECT * FROM myTopicLEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctimeON myTopic.key = MyUserTable.id;
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
>
> Best,
> Guowei
>
>
> On Wed, Nov 18, 2020 at 3:05 PM tkg_cangkul <yu...@gmail.com> wrote:
>
>> Hi Guowei Ma,
>>
>> Thanks for your reply,
>> In my case.
>> I've some data on my kafka topic. and i want to get the detail of the
>> data from my reference mysql table.
>> for example :
>>
>> in my kafka topic i've this fields :
>>
>> id, name, position, experience
>>
>> in my reference mysql table i've this fields:
>>
>> id, name, age, sex
>>
>> So , i want to do left join to get the detail data from my reference
>> table.
>>
>> How can i do this with flink?
>> Pls advice
>>
>> On 17/11/20 07:46, Guowei Ma wrote:
>>
>> Hi, Youzha
>>
>> In general `CoGroup` is for the window based operation. How it could
>> satisfy your requirements depends on your specific scenario. But if you
>> want to look at the mysql table as a dimension table. There might be other
>> two ways:
>> 1. Using Table/Sql SDK. You could find a sql example(temporal join the
>> JDBC table as a dimension table) in the table jdbc connector [1] and more
>> join information in the [2]
>> 2. Using DataStream SDK. Maybe you could see whether the `AsycIO`
>> function could satisfy your requirements. You could find the example in
>> [3].
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html
>>
>> Best,
>> Guowei
>>
>>
>> On Mon, Nov 16, 2020 at 11:20 PM Youzha <yu...@gmail.com> wrote:
>>
>>> Hi i want to do join reference between kafka with mysql table reference.
>>> how can i do this thing with flink stream. does coGroup function can handle
>>> this ? or anyone have java sample code with this case? i’ve read some
>>> article that said if cogroup function can do left outer join. but i’m still
>>> confuse to implement it because i just learned flink stream.
>>>
>>>
>>> need advice pls.
>>>
>>
>>
>
Re: left join flink stream
Posted by tkg_cangkul <yu...@gmail.com>.
Hi,
i'm using java for do this thing.
and i've success to register the tables.
i've success to select each table.
Table result1 = tEnv.sqlQuery("select status_code from table_kafka");
Table result2 = tEnv.sqlQuery("select status_code from table_mysql_reff");
but when i try join query i've some error msg like this :
Caused by: org.apache.flink.table.api.TableException: Generic RAW types
must have a common type information. at
org.apache.flink.table.planner.calcite.FlinkTypeFactory.resolveAllIdenticalTypes(FlinkTypeFactory.scala:381)
is there any somethine that i missed here?
On 23/11/20 08:43, Guowei Ma wrote:
> Hi
> One way would look like as following
> 1. create the probe table from Kafka as following. You could find more
> detailed information from doc[1]
> |CREATE TABLE myTopic (
> id BIGINT,
> item_id BIGINT,
> category_id BIGINT,
> behavior STRING,
> ts TIMESTAMP(3)
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'user_behavior',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'properties.group.id <http://properties.group.id>' = 'testGroup',
> 'format' = 'csv',
> 'scan.startup.mode' = 'earliest-offset'
> )|
> 2. create the build table from mysql as following. You could find more
> detailed information from doc[2]
> |CREATE TABLE MyUserTable (
> id BIGINT,
> sex STRING,
> age INT,
> status BOOLEAN,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
> 'table-name' = 'users'
> );|
> 3. join the tables as following. You could find more detailed
> information from doc[3]
> |-- temporal join the JDBC table as a dimension table
> SELECT * FROM myTopic
> LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
> ON myTopic.key = MyUserTable.id;|
> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> [2]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
> [3]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
>
> Best,
> Guowei
>
>
> On Wed, Nov 18, 2020 at 3:05 PM tkg_cangkul <yuza.rasfar@gmail.com
> <ma...@gmail.com>> wrote:
>
> Hi Guowei Ma,
>
> Thanks for your reply,
> In my case.
> I've some data on my kafka topic. and i want to get the detail of
> the data from my reference mysql table.
> for example :
>
> in my kafka topic i've this fields :
>
> id, name, position, experience
>
> in my reference mysql table i've this fields:
>
> id, name, age, sex
>
> So , i want to do left join to get the detail data from my
> reference table.
>
> How can i do this with flink?
> Pls advice
>
> On 17/11/20 07:46, Guowei Ma wrote:
>> Hi, Youzha
>>
>> In general `CoGroup` is for the window based operation. How it
>> could satisfy your requirements depends on your specific
>> scenario. But if you want to look at the mysql table as a
>> dimension table. There might be other two ways:
>> 1. Using Table/Sql SDK. You could find a sql example(temporal
>> join the JDBC table as a dimension table) in the table jdbc
>> connector [1] and more join information in the [2]
>> 2. Using DataStream SDK. Maybe you could see whether the `AsycIO`
>> function could satisfy your requirements. You could find the
>> example in [3].
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html
>>
>> Best,
>> Guowei
>>
>>
>> On Mon, Nov 16, 2020 at 11:20 PM Youzha <yuza.rasfar@gmail.com
>> <ma...@gmail.com>> wrote:
>>
>> Hi i want to do join reference between kafka with mysql table
>> reference. how can i do this thing with flink stream. does
>> coGroup function can handle this ? or anyone have java sample
>> code with this case? i’ve read some article that said if
>> cogroup function can do left outer join. but i’m still
>> confuse to implement it because i just learned flink stream.
>>
>>
>> need advice pls.
>>
>
Re: left join flink stream
Posted by Guowei Ma <gu...@gmail.com>.
Hi
One way would look like as following
1. create the probe table from Kafka as following. You could find more
detailed information from doc[1]
CREATE TABLE myTopic (
id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'csv',
'scan.startup.mode' = 'earliest-offset')
2. create the build table from mysql as following. You could find more
detailed information from doc[2]
CREATE TABLE MyUserTable (
id BIGINT,
sex STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'users');
3. join the tables as following. You could find more detailed information
from doc[3]
-- temporal join the JDBC table as a dimension tableSELECT * FROM
myTopicLEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctimeON
myTopic.key = MyUserTable.id;
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
Best,
Guowei
On Wed, Nov 18, 2020 at 3:05 PM tkg_cangkul <yu...@gmail.com> wrote:
> Hi Guowei Ma,
>
> Thanks for your reply,
> In my case.
> I've some data on my kafka topic. and i want to get the detail of the data
> from my reference mysql table.
> for example :
>
> in my kafka topic i've this fields :
>
> id, name, position, experience
>
> in my reference mysql table i've this fields:
>
> id, name, age, sex
>
> So , i want to do left join to get the detail data from my reference
> table.
>
> How can i do this with flink?
> Pls advice
>
> On 17/11/20 07:46, Guowei Ma wrote:
>
> Hi, Youzha
>
> In general `CoGroup` is for the window based operation. How it could
> satisfy your requirements depends on your specific scenario. But if you
> want to look at the mysql table as a dimension table. There might be other
> two ways:
> 1. Using Table/Sql SDK. You could find a sql example(temporal join the
> JDBC table as a dimension table) in the table jdbc connector [1] and more
> join information in the [2]
> 2. Using DataStream SDK. Maybe you could see whether the `AsycIO` function
> could satisfy your requirements. You could find the example in [3].
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html
>
> Best,
> Guowei
>
>
> On Mon, Nov 16, 2020 at 11:20 PM Youzha <yu...@gmail.com> wrote:
>
>> Hi i want to do join reference between kafka with mysql table reference.
>> how can i do this thing with flink stream. does coGroup function can handle
>> this ? or anyone have java sample code with this case? i’ve read some
>> article that said if cogroup function can do left outer join. but i’m still
>> confuse to implement it because i just learned flink stream.
>>
>>
>> need advice pls.
>>
>
>
Re: left join flink stream
Posted by tkg_cangkul <yu...@gmail.com>.
Hi Guowei Ma,
Thanks for your reply,
In my case.
I've some data on my kafka topic. and i want to get the detail of the
data from my reference mysql table.
for example :
in my kafka topic i've this fields :
id, name, position, experience
in my reference mysql table i've this fields:
id, name, age, sex
So , i want to do left join to get the detail data from my reference table.
How can i do this with flink?
Pls advice
On 17/11/20 07:46, Guowei Ma wrote:
> Hi, Youzha
>
> In general `CoGroup` is for the window based operation. How it could
> satisfy your requirements depends on your specific scenario. But if
> you want to look at the mysql table as a dimension table. There might
> be other two ways:
> 1. Using Table/Sql SDK. You could find a sql example(temporal join the
> JDBC table as a dimension table) in the table jdbc connector [1] and
> more join information in the [2]
> 2. Using DataStream SDK. Maybe you could see whether the `AsycIO`
> function could satisfy your requirements. You could find the example
> in [3].
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html
>
> Best,
> Guowei
>
>
> On Mon, Nov 16, 2020 at 11:20 PM Youzha <yuza.rasfar@gmail.com
> <ma...@gmail.com>> wrote:
>
> Hi i want to do join reference between kafka with mysql table
> reference. how can i do this thing with flink stream. does coGroup
> function can handle this ? or anyone have java sample code with
> this case? i’ve read some article that said if cogroup function
> can do left outer join. but i’m still confuse to implement it
> because i just learned flink stream.
>
>
> need advice pls.
>
Re: left join flink stream
Posted by Guowei Ma <gu...@gmail.com>.
Hi, Youzha
In general `CoGroup` is for the window based operation. How it could
satisfy your requirements depends on your specific scenario. But if you
want to look at the mysql table as a dimension table. There might be other
two ways:
1. Using Table/Sql SDK. You could find a sql example(temporal join the JDBC
table as a dimension table) in the table jdbc connector [1] and more join
information in the [2]
2. Using DataStream SDK. Maybe you could see whether the `AsycIO` function
could satisfy your requirements. You could find the example in [3].
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html
Best,
Guowei
On Mon, Nov 16, 2020 at 11:20 PM Youzha <yu...@gmail.com> wrote:
> Hi i want to do join reference between kafka with mysql table reference.
> how can i do this thing with flink stream. does coGroup function can handle
> this ? or anyone have java sample code with this case? i’ve read some
> article that said if cogroup function can do left outer join. but i’m still
> confuse to implement it because i just learned flink stream.
>
>
> need advice pls.
>