You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Youzha <yu...@gmail.com> on 2020/11/16 15:20:24 UTC

left join flink stream

Hi i want to do join reference between kafka with mysql table reference.
how can i do this thing with flink stream. does coGroup function can handle
this ? or anyone have java sample code with this case? i’ve read some
article that said if cogroup function can do left outer join. but i’m still
confuse to implement it because  i just learned  flink stream.


need advice pls.

Re: left join flink stream

Posted by Guowei Ma <gu...@gmail.com>.
Hi, Youzha
Sorry for the late reply. It seems that the type is mis-type-match.
Could you
1. tableA.printSchema to print the schema?
2. KafkaSource.getType() to print the typeinformation?

Best,
Guowei


On Mon, Nov 23, 2020 at 5:28 PM Youzha <yu...@gmail.com> wrote:

> Hi, this is sample code :
>
>
> Table tableA = tEnv.fromDataStream(KafkaSource,"timestamp, id, status");
>
> tEnv.registerTable("tbl_kafka", tableA);
>
> Table result = tEnv.sqlQuery("select * from tbl_kafka where id = 'E02'");
>
>
> fyi, i’m using avro format on my KafkaSource
>
>
> Best Regards,
>
> .
>
>
>
>
> On Mon, 23 Nov 2020 at 14.44 Guowei Ma <gu...@gmail.com> wrote:
>
>> Could you share your code?
>> Best,
>> Guowei
>>
>>
>> On Mon, Nov 23, 2020 at 12:05 PM tkg_cangkul <yu...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> i'm using java for do this thing.
>>> and i've success to register the tables.
>>>
>>> i've success to select each table.
>>>
>>> Table result1 = tEnv.sqlQuery("select status_code from table_kafka");
>>> Table result2 = tEnv.sqlQuery("select status_code from
>>> table_mysql_reff");
>>>
>>> but when i try join query i've some error msg like this :
>>>
>>> Caused by: org.apache.flink.table.api.TableException: Generic RAW types
>>> must have a common type information. at
>>> org.apache.flink.table.planner.calcite.FlinkTypeFactory.resolveAllIdenticalTypes(FlinkTypeFactory.scala:381)
>>>
>>>
>>> is there any somethine that i missed here?
>>>
>>> On 23/11/20 08:43, Guowei Ma wrote:
>>>
>>> Hi
>>> One way would look like as following
>>> 1. create the probe table from Kafka as following. You could find more
>>> detailed information from doc[1]
>>>
>>> CREATE TABLE myTopic (
>>>  id BIGINT,
>>>  item_id BIGINT,
>>>  category_id BIGINT,
>>>  behavior STRING,
>>>  ts TIMESTAMP(3)) WITH (
>>>  'connector' = 'kafka',
>>>  'topic' = 'user_behavior',
>>>  'properties.bootstrap.servers' = 'localhost:9092',
>>>  'properties.group.id' = 'testGroup',
>>>  'format' = 'csv',
>>>  'scan.startup.mode' = 'earliest-offset')
>>>
>>> 2. create the build table from mysql as following. You could find more
>>> detailed information from doc[2]
>>>
>>> CREATE TABLE MyUserTable (
>>>   id BIGINT,
>>>   sex STRING,
>>>   age INT,
>>>   status BOOLEAN,
>>>   PRIMARY KEY (id) NOT ENFORCED) WITH (
>>>    'connector' = 'jdbc',
>>>    'url' = 'jdbc:mysql://localhost:3306/mydatabase',
>>>    'table-name' = 'users');
>>>
>>> 3. join the tables as following. You could find more detailed
>>> information from doc[3]
>>>
>>> -- temporal join the JDBC table as a dimension tableSELECT * FROM myTopicLEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctimeON myTopic.key = MyUserTable.id;
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Wed, Nov 18, 2020 at 3:05 PM tkg_cangkul <yu...@gmail.com>
>>> wrote:
>>>
>>>> Hi Guowei Ma,
>>>>
>>>> Thanks for your reply,
>>>> In my case.
>>>> I've some data on my kafka topic. and i want to get the detail of the
>>>> data from my reference mysql table.
>>>> for example :
>>>>
>>>> in my kafka topic i've this fields :
>>>>
>>>> id, name, position, experience
>>>>
>>>> in my reference mysql table i've this fields:
>>>>
>>>> id, name, age, sex
>>>>
>>>> So , i want to do left join to get the detail data from my reference
>>>> table.
>>>>
>>>> How can i do this with flink?
>>>> Pls advice
>>>>
>>>> On 17/11/20 07:46, Guowei Ma wrote:
>>>>
>>>> Hi, Youzha
>>>>
>>>> In general `CoGroup` is for the window based operation. How it could
>>>> satisfy your requirements depends on  your specific scenario. But if you
>>>> want to look at the mysql table as a dimension table. There might be other
>>>> two ways:
>>>> 1. Using Table/Sql SDK. You could find a sql example(temporal join the
>>>> JDBC table as a dimension table) in the table jdbc connector [1] and more
>>>> join information in the [2]
>>>> 2. Using DataStream SDK. Maybe you could see whether the `AsycIO`
>>>> function could satisfy your requirements. You could find the example in
>>>> [3].
>>>>
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
>>>> [3]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html
>>>>
>>>> Best,
>>>> Guowei
>>>>
>>>>
>>>> On Mon, Nov 16, 2020 at 11:20 PM Youzha <yu...@gmail.com> wrote:
>>>>
>>>>> Hi i want to do join reference between kafka with mysql table
>>>>> reference. how can i do this thing with flink stream. does coGroup function
>>>>> can handle this ? or anyone have java sample code with this case? i’ve read
>>>>> some article that said if cogroup function can do left outer join. but i’m
>>>>> still confuse to implement it because  i just learned  flink stream.
>>>>>
>>>>>
>>>>> need advice pls.
>>>>>
>>>>
>>>>
>>>

Re: left join flink stream

Posted by Guowei Ma <gu...@gmail.com>.
Could you share your code?
Best,
Guowei


On Mon, Nov 23, 2020 at 12:05 PM tkg_cangkul <yu...@gmail.com> wrote:

> Hi,
>
> i'm using java for do this thing.
> and i've success to register the tables.
>
> i've success to select each table.
>
> Table result1 = tEnv.sqlQuery("select status_code from table_kafka");
> Table result2 = tEnv.sqlQuery("select status_code from table_mysql_reff");
>
> but when i try join query i've some error msg like this :
>
> Caused by: org.apache.flink.table.api.TableException: Generic RAW types
> must have a common type information. at
> org.apache.flink.table.planner.calcite.FlinkTypeFactory.resolveAllIdenticalTypes(FlinkTypeFactory.scala:381)
>
>
> is there any somethine that i missed here?
>
> On 23/11/20 08:43, Guowei Ma wrote:
>
> Hi
> One way would look like as following
> 1. create the probe table from Kafka as following. You could find more
> detailed information from doc[1]
>
> CREATE TABLE myTopic (
>  id BIGINT,
>  item_id BIGINT,
>  category_id BIGINT,
>  behavior STRING,
>  ts TIMESTAMP(3)) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'testGroup',
>  'format' = 'csv',
>  'scan.startup.mode' = 'earliest-offset')
>
> 2. create the build table from mysql as following. You could find more
> detailed information from doc[2]
>
> CREATE TABLE MyUserTable (
>   id BIGINT,
>   sex STRING,
>   age INT,
>   status BOOLEAN,
>   PRIMARY KEY (id) NOT ENFORCED) WITH (
>    'connector' = 'jdbc',
>    'url' = 'jdbc:mysql://localhost:3306/mydatabase',
>    'table-name' = 'users');
>
> 3. join the tables as following. You could find more detailed information
> from doc[3]
>
> -- temporal join the JDBC table as a dimension tableSELECT * FROM myTopicLEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctimeON myTopic.key = MyUserTable.id;
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
>
> Best,
> Guowei
>
>
> On Wed, Nov 18, 2020 at 3:05 PM tkg_cangkul <yu...@gmail.com> wrote:
>
>> Hi Guowei Ma,
>>
>> Thanks for your reply,
>> In my case.
>> I've some data on my kafka topic. and i want to get the detail of the
>> data from my reference mysql table.
>> for example :
>>
>> in my kafka topic i've this fields :
>>
>> id, name, position, experience
>>
>> in my reference mysql table i've this fields:
>>
>> id, name, age, sex
>>
>> So , i want to do left join to get the detail data from my reference
>> table.
>>
>> How can i do this with flink?
>> Pls advice
>>
>> On 17/11/20 07:46, Guowei Ma wrote:
>>
>> Hi, Youzha
>>
>> In general `CoGroup` is for the window based operation. How it could
>> satisfy your requirements depends on  your specific scenario. But if you
>> want to look at the mysql table as a dimension table. There might be other
>> two ways:
>> 1. Using Table/Sql SDK. You could find a sql example(temporal join the
>> JDBC table as a dimension table) in the table jdbc connector [1] and more
>> join information in the [2]
>> 2. Using DataStream SDK. Maybe you could see whether the `AsycIO`
>> function could satisfy your requirements. You could find the example in
>> [3].
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html
>>
>> Best,
>> Guowei
>>
>>
>> On Mon, Nov 16, 2020 at 11:20 PM Youzha <yu...@gmail.com> wrote:
>>
>>> Hi i want to do join reference between kafka with mysql table reference.
>>> how can i do this thing with flink stream. does coGroup function can handle
>>> this ? or anyone have java sample code with this case? i’ve read some
>>> article that said if cogroup function can do left outer join. but i’m still
>>> confuse to implement it because  i just learned  flink stream.
>>>
>>>
>>> need advice pls.
>>>
>>
>>
>

Re: left join flink stream

Posted by tkg_cangkul <yu...@gmail.com>.
Hi,

i'm using java for do this thing.
and i've success to register the tables.

i've success to select each table.

Table result1 = tEnv.sqlQuery("select status_code from table_kafka");
Table result2 = tEnv.sqlQuery("select status_code from table_mysql_reff");

but when i try join query i've some error msg like this :

Caused by: org.apache.flink.table.api.TableException: Generic RAW types 
must have a common type information. at 
org.apache.flink.table.planner.calcite.FlinkTypeFactory.resolveAllIdenticalTypes(FlinkTypeFactory.scala:381) 


is there any somethine that i missed here?

On 23/11/20 08:43, Guowei Ma wrote:
> Hi
> One way would look like as following
> 1. create the probe table from Kafka as following. You could find more 
> detailed information from doc[1]
> |CREATE  TABLE  myTopic  (
>   id  BIGINT,
>   item_id  BIGINT,
>   category_id  BIGINT,
>   behavior  STRING,
>   ts  TIMESTAMP(3)
> )  WITH  (
>   'connector'  =  'kafka',
>   'topic'  =  'user_behavior',
>   'properties.bootstrap.servers'  =  'localhost:9092',
>   'properties.group.id  <http://properties.group.id>'  =  'testGroup',
>   'format'  =  'csv',
>   'scan.startup.mode'  =  'earliest-offset'
> )|
> 2. create the build table from mysql as following. You could find more 
> detailed information from doc[2]
> |CREATE  TABLE  MyUserTable  (
>    id  BIGINT,
>    sex  STRING,
>    age  INT,
>    status  BOOLEAN,
>    PRIMARY  KEY  (id)  NOT  ENFORCED
> )  WITH  (
>     'connector'  =  'jdbc',
>     'url'  =  'jdbc:mysql://localhost:3306/mydatabase',
>     'table-name'  =  'users'
> );|
> 3. join the tables as following. You could find more detailed 
> information from doc[3]
> |-- temporal join the JDBC table as a dimension table
> SELECT  *  FROM  myTopic
> LEFT  JOIN  MyUserTable  FOR  SYSTEM_TIME  AS  OF  myTopic.proctime
> ON  myTopic.key  =  MyUserTable.id;|
> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> [2]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
> [3]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
>
> Best,
> Guowei
>
>
> On Wed, Nov 18, 2020 at 3:05 PM tkg_cangkul <yuza.rasfar@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     Hi Guowei Ma,
>
>     Thanks for your reply,
>     In my case.
>     I've some data on my kafka topic. and i want to get the detail of
>     the data from my reference mysql table.
>     for example :
>
>     in my kafka topic i've this fields :
>
>     id, name, position, experience
>
>     in my reference mysql table i've this fields:
>
>     id, name, age, sex
>
>     So , i want to do left join to get the detail data from my
>     reference table.
>
>     How can i do this with flink?
>     Pls advice
>
>     On 17/11/20 07:46, Guowei Ma wrote:
>>     Hi, Youzha
>>
>>     In general `CoGroup` is for the window based operation. How it
>>     could satisfy your requirements depends on  your specific
>>     scenario. But if you want to look at the mysql table as a
>>     dimension table. There might be other two ways:
>>     1. Using Table/Sql SDK. You could find a sql example(temporal
>>     join the JDBC table as a dimension table) in the table jdbc
>>     connector [1] and more join information in the [2]
>>     2. Using DataStream SDK. Maybe you could see whether the `AsycIO`
>>     function could satisfy your requirements. You could find the
>>     example in [3].
>>
>>
>>     [1]
>>     https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
>>     [2]
>>     https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
>>     [3]
>>     https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html
>>
>>     Best,
>>     Guowei
>>
>>
>>     On Mon, Nov 16, 2020 at 11:20 PM Youzha <yuza.rasfar@gmail.com
>>     <ma...@gmail.com>> wrote:
>>
>>         Hi i want to do join reference between kafka with mysql table
>>         reference. how can i do this thing with flink stream. does
>>         coGroup function can handle this ? or anyone have java sample
>>         code with this case? i’ve read some article that said if
>>         cogroup function can do left outer join. but i’m still
>>         confuse to implement it because  i just learned  flink stream.
>>
>>
>>         need advice pls.
>>
>


Re: left join flink stream

Posted by Guowei Ma <gu...@gmail.com>.
Hi
One way would look like as following
1. create the probe table from Kafka as following. You could find more
detailed information from doc[1]

CREATE TABLE myTopic (
 id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'csv',
 'scan.startup.mode' = 'earliest-offset')

2. create the build table from mysql as following. You could find more
detailed information from doc[2]

CREATE TABLE MyUserTable (
  id BIGINT,
  sex STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users');

3. join the tables as following. You could find more detailed information
from doc[3]

-- temporal join the JDBC table as a dimension tableSELECT * FROM
myTopicLEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctimeON
myTopic.key = MyUserTable.id;

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html

Best,
Guowei


On Wed, Nov 18, 2020 at 3:05 PM tkg_cangkul <yu...@gmail.com> wrote:

> Hi Guowei Ma,
>
> Thanks for your reply,
> In my case.
> I've some data on my kafka topic. and i want to get the detail of the data
> from my reference mysql table.
> for example :
>
> in my kafka topic i've this fields :
>
> id, name, position, experience
>
> in my reference mysql table i've this fields:
>
> id, name, age, sex
>
> So , i want to do left join to get the detail data from my reference
> table.
>
> How can i do this with flink?
> Pls advice
>
> On 17/11/20 07:46, Guowei Ma wrote:
>
> Hi, Youzha
>
> In general `CoGroup` is for the window based operation. How it could
> satisfy your requirements depends on  your specific scenario. But if you
> want to look at the mysql table as a dimension table. There might be other
> two ways:
> 1. Using Table/Sql SDK. You could find a sql example(temporal join the
> JDBC table as a dimension table) in the table jdbc connector [1] and more
> join information in the [2]
> 2. Using DataStream SDK. Maybe you could see whether the `AsycIO` function
> could satisfy your requirements. You could find the example in [3].
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html
>
> Best,
> Guowei
>
>
> On Mon, Nov 16, 2020 at 11:20 PM Youzha <yu...@gmail.com> wrote:
>
>> Hi i want to do join reference between kafka with mysql table reference.
>> how can i do this thing with flink stream. does coGroup function can handle
>> this ? or anyone have java sample code with this case? i’ve read some
>> article that said if cogroup function can do left outer join. but i’m still
>> confuse to implement it because  i just learned  flink stream.
>>
>>
>> need advice pls.
>>
>
>

Re: left join flink stream

Posted by tkg_cangkul <yu...@gmail.com>.
Hi Guowei Ma,

Thanks for your reply,
In my case.
I've some data on my kafka topic. and i want to get the detail of the 
data from my reference mysql table.
for example :

in my kafka topic i've this fields :

id, name, position, experience

in my reference mysql table i've this fields:

id, name, age, sex

So , i want to do left join to get the detail data from my reference table.

How can i do this with flink?
Pls advice

On 17/11/20 07:46, Guowei Ma wrote:
> Hi, Youzha
>
> In general `CoGroup` is for the window based operation. How it could 
> satisfy your requirements depends on  your specific scenario. But if 
> you want to look at the mysql table as a dimension table. There might 
> be other two ways:
> 1. Using Table/Sql SDK. You could find a sql example(temporal join the 
> JDBC table as a dimension table) in the table jdbc connector [1] and 
> more join information in the [2]
> 2. Using DataStream SDK. Maybe you could see whether the `AsycIO` 
> function could satisfy your requirements. You could find the example 
> in [3].
>
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
> [3] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html 
>
> Best,
> Guowei
>
>
> On Mon, Nov 16, 2020 at 11:20 PM Youzha <yuza.rasfar@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     Hi i want to do join reference between kafka with mysql table
>     reference. how can i do this thing with flink stream. does coGroup
>     function can handle this ? or anyone have java sample code with
>     this case? i’ve read some article that said if cogroup function
>     can do left outer join. but i’m still confuse to implement it
>     because  i just learned  flink stream.
>
>
>     need advice pls.
>


Re: left join flink stream

Posted by Guowei Ma <gu...@gmail.com>.
Hi, Youzha

In general `CoGroup` is for the window based operation. How it could
satisfy your requirements depends on  your specific scenario. But if you
want to look at the mysql table as a dimension table. There might be other
two ways:
1. Using Table/Sql SDK. You could find a sql example(temporal join the JDBC
table as a dimension table) in the table jdbc connector [1] and more join
information in the [2]
2. Using DataStream SDK. Maybe you could see whether the `AsycIO` function
could satisfy your requirements. You could find the example in [3].


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html

Best,
Guowei


On Mon, Nov 16, 2020 at 11:20 PM Youzha <yu...@gmail.com> wrote:

> Hi i want to do join reference between kafka with mysql table reference.
> how can i do this thing with flink stream. does coGroup function can handle
> this ? or anyone have java sample code with this case? i’ve read some
> article that said if cogroup function can do left outer join. but i’m still
> confuse to implement it because  i just learned  flink stream.
>
>
> need advice pls.
>