You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 徐涛 <ha...@gmail.com> on 2019/03/12 06:13:42 UTC

Re: How to join stream and dimension data in Flink?

Hi Hequn,
	I want to implement stream join dimension in Flink SQL, I found there is a new feature named Temporal Tables delivered by Flink1.7, I think it maybe could be used to achieve the join between stream and dimension table. But I am not sure about that. Could anyone help me about it? 
	Thanks a lot for your help.

Best 
Henry

> 在 2018年9月26日,上午12:16,Hequn Cheng <ch...@gmail.com> 写道:
> 
> Hi vino,
> 
> Thanks for sharing the link. It's a great book and I will take a look. 
> There are kinds of join. Different joins have different semantics. From the link, I think it means the time versioned join.  FLINK-9712 <https://issues.apache.org/jira/browse/FLINK-9712> enrichments joins with Time Versioned Functions and the result is deterministic under eventime.  
> 
> Best, Hequn
> 
> On Tue, Sep 25, 2018 at 11:05 PM vino yang <yanghua1127@gmail.com <ma...@gmail.com>> wrote:
> Hi Hequn,
> 
> The specific content of the book does not give a right or wrong conclusion, but it illustrates this phenomenon: two streams of the same input, playing and joining at the same time, due to the order of events, the connection results are uncertain. This is because the two streams are intertwined in different forms. This has nothing to do with orderby, just that it exists in the stream stream join. Of course, this phenomenon is only a comparison statement with a non-stream join.
> 
> In addition, I recommend this book, which is very famous on Twitter and Amazon. Because you are also Chinese, there is a good translation here. If I guess it is correct, the main translator is also from your company. This part of what I mentioned is here.[1]
> 
> [1]: https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7 <https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7>
> 
> Thanks, vino.
> 
> Hequn Cheng <chenghequn@gmail.com <ma...@gmail.com>> 于2018年9月25日周二 下午9:45写道:
> Hi vino,
> 
> There are no order problems of stream-stream join in Flink. No matter what order the elements come, stream-stream join in Flink will output results which consistent with standard SQL semantics. I haven't read the book you mentioned. For join, it doesn't guarantee output orders. You have to do orderBy if you want to get ordered results.
> 
> Best, Hequn
> 
> On Tue, Sep 25, 2018 at 8:36 PM vino yang <yanghua1127@gmail.com <ma...@gmail.com>> wrote:
> Hi Fabian,
> 
> I may not have stated it here, and there is no semantic problem at the Flink implementation level. Rather, there may be “Time-dependence” here. [1]
> 
> Yes, my initial answer was not to use this form of join in this scenario, but Henry said he converted the table into a stream table and asked about the feasibility of other methods.
> 
> [1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part 3: Derived Data, Chapter 11: Stream Processing , Stream Joins.
> 
> some content :
> If the ordering of events across streams is undetermined, the join becomes nondeter‐ ministic [87], which means you cannot rerun the same job on the same input and necessarily get the same result: the events on the input streams may be interleaved in a different way when you run the job again. 
> 
> 
> 
> Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> 于2018年9月25日周二 下午8:08写道:
> Hi,
> 
> I don't think that using the current join implementation in the Table API / SQL will work.
> The non-windowed join fully materializes *both* input tables in state. This is necessary, because the join needs to be able to process updates on either side.
> While this is not a problem for the fixed sized MySQL table, materializing the append-only table (aka stream) is probably not what you want.
> You can also not limit idle state retention because it would remove the MySQL table from state at some point.
> 
> The only way to make it work is using a user-defined TableFunction that queries the MySQL table via JDBC. 
> However, please note that these calls would be synchronous, blocking calls.
> 
> @Vino: Why do you think that the stream & stream join is not mature and which problems do you see in the semantics? 
> The semantics are correct (standard SQL semantics) and in my opinion the implementation is also mature.
> However, you should not use the non-windowed join if any of the input tables is ever growing because both sides must be hold in state. This is not an issue of the semantics.
> 
> Cheers,
> Fabian
> 
> Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang <yanghua1127@gmail.com <ma...@gmail.com>>:
> Hi Henry,
> 
> 1) I don't recommend this method very much, but you said that you expect to convert mysql table to stream and then to flink table. Under this premise, I said that you can do this by joining two stream tables. But as you know, this join depends on the time period in which the state is saved. To make it equivalent to a dimension table, you must permanently save the state of the stream table that is defined as a "dimension table." I just said that modifying the relevant configuration in Flink can do this, Not for a single table.
> 
> 2) Imagine that there are one million records in two tables. The records in both tables are just beginning to stream into flink, and the records as dimension tables are not fully arrived. Therefore, your matching results may not be as accurate as directly querying Mysql.
> 
> In fact, the current stream & stream join is not very mature, there are some problems in semantics, I personally recommend that you return to stream/batch (mysql) join. For more principle content, I recommend you read a book, referred to as 《DDIA》.
> 
> Thanks, vino.
> 
> 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> 于2018年9月25日周二 下午5:48写道:
> Hi Vino,
> 	I do not quite understand in some sentences below, would you please help explain it a bit more detailedly?
> 	1. “such as setting the state retention time of one of the tables to be permanent” , as I know, the state retention time is a global config, I can not set this property per table.
> 	2. "you may not be able to match the results, because the data belonging to the mysql table is just beginning to play as a stream”  Why it is not able to match the results?
> 
> Best
> Henry
> 
>> 在 2018年9月25日,下午5:29,vino yang <yanghua1127@gmail.com <ma...@gmail.com>> 写道:
>> 
>> Hi Henry,
>> 
>> If you have converted the mysql table to a flink stream table. In flink table/sql, streams and stream joins can also do this, such as setting the state retention time of one of the tables to be permanent. But when the job is just running, you may not be able to match the results, because the data belonging to the mysql table is just beginning to play as a stream.
>> 
>> Thanks, vino.
>> 
>> 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> 于2018年9月25日周二 下午5:10写道:
>> Hi Vino & Hequn,
>> 	I am now using the table/sql API, if I import the mysql table as a stream then convert it into a table, it seems that it can also be a workaround for batch/streaming joining. May I ask what is the difference between the UDTF method? Does this implementation has some defects?
>> 	
>> Best
>> Henry
>> 
>>> 在 2018年9月22日,上午10:28,Hequn Cheng <chenghequn@gmail.com <ma...@gmail.com>> 写道:
>>> 
>>> Hi
>>> 
>>> +1 for vino's answer. 
>>> Also, this kind of join will be supported in FLINK-9712 <https://issues.apache.org/jira/browse/FLINK-9712>. You can check more details in the jira.
>>> 
>>> Best, Hequn
>>> 
>>> On Fri, Sep 21, 2018 at 4:51 PM vino yang <yanghua1127@gmail.com <ma...@gmail.com>> wrote:
>>> Hi Henry,
>>> 
>>> There are three ways I can think of:
>>> 
>>> 1) use DataStream API, implement a flatmap UDF to access dimension table;
>>> 2) use table/sql API, implement a UDTF to access dimension table;
>>> 3) customize the table/sql join API/statement's implementation (and change the physical plan)
>>> 
>>> Thanks, vino.
>>> 
>>> 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> 于2018年9月21日周五 下午4:43写道:
>>> Hi All,
>>>         Sometimes some “dimension table” need to be joined from the "fact table", if data are not joined before sent to Kafka.
>>>         So if the data are joined in Flink, does the “dimension table” have to be import as a stream, or there are some other ways can achieve it?
>>>         Thanks a lot!
>>> 
>>> Best
>>> Henry
>> 
> 


Re: How to join stream and dimension data in Flink?

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi Henry,

1. Also take a look at the regular joins limitations <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#regular-joins>:

> However, this operation has an important implication: it requires to keep both sides of the join input in Flink’s state forever. Thus, the resource usage will grow
> indefinitely as well, if one or both input tables are continuously growing.

4. Our current grammar for temporal table joins is like a stop gap solution that is ANSI SQL complainant. Unfortunately SQL standard lags behind the streaming requirements and we are working on addressing this issue. [1]

5. It will be execute in a similar fashion how you would expect regular hash join to be executed - the “WHERE” join condition will be pushed into the temporal table join operator.

6. I don’t think that Flink supports the syntax suggested by Hequn. Currently outer joins are not supported with temporal tables.

Piotrek

[1] https://issues.apache.org/jira/browse/CALCITE-1917 <https://issues.apache.org/jira/browse/CALCITE-1917>


> On 14 Mar 2019, at 03:31, Hequn Cheng <ch...@gmail.com> wrote:
> 
> Hi Henry,
> 
> These are good questions! 
> I would rather not to add the temporal and lateral prefix in front of the join. The temporal table is a concept orthogonal to join. We should say join a temporal table or join a Lateral table. 
> 1. You can of course use stream-stream join. Introducing the temporal table not only makes our query more simple but also improves performance. More detail can be found in [1].
> 2. Both two joins based on the concept of temporal table, i.e., a table joins a temporal table.
> 3. Yes, actually the join in Flink uses a lateral table&TemporalTableFunction to implement a temporal table. A temporal table is a versioned table and a lateral table is a table keeps references to the previous table. If you do not want to use time version, you don't need the temporal table. 
> 4. It is a kind of join. The join keyword can be omitted if it is an inner join. The grammar will not be changed in the near future. I haven't heard some news about changing it.
> 5. Yes, it will be optimized. 
> 6. If you want to left join a temporal table. You can write sql like:
> 
> SELECT
>   o.amout, o.currency, r.rate, o.amount * r.rate
> FROM
>   Orders AS o
>   LEFT JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
>   ON r.currency = o.currency
> 
> CC @Piotr Nowojski <ma...@data-artisans.com>  Would be great to have your opinions here.
> 
> Best,
> Hequn
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table>
> 
> 
> On Wed, Mar 13, 2019 at 1:59 PM 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> wrote:
> Hi Hequn,
> 	Thanks a lot for your answer! That is very helpful for me.
> 	I still have some questions about stream and dimension data join and temporal table join:  
> 	1. I found the temporal table join is still a one stream driven join, I do not know why the dimension data join has to be done by one stream driven join, why it can not be done by two stream join(traditional stream-stream join)?
> 		I try to give an answer about it: two stream join is based on the mechanism that is materialize two stream data in state, but the due to state retention, the dimension data may be lost. I guess this is one reason, am I correct?
> 	2. Is Blink`s stream and dimension data join based on temporal table join? 
>         3. I think lateral table join can also do dimension join if I do not want to use time versioning. How to choose between temporal table join and lateral table join?
> 	4. I found that the temporal table join in Flink use a “LATERAL TABLE” grammar, but not “JOIN”, it is OK but not easier to use than “JOIN”, will the community modify the grammar in future releases?
> 	5. In the following temporal table join statement, will the Orders table join Rates produce too many data before the where clause take effects? Will it be optimized?
> SELECT
>   o.amount * r.rate AS amount
> FROM
>   Orders AS o,
>   LATERAL TABLE (Rates(o.rowtime)) AS r
> WHERE r.currency = o.currency 
> 	6. How to use temporal table join to do left join?
> 
> 
> Best
> Henry
> 
>> 在 2019年3月13日,上午12:02,Hequn Cheng <chenghequn@gmail.com <ma...@gmail.com>> 写道:
>> 
>> Hi Henry,
>> 
>> Yes, you are correct. Basically, there are two ways you can use to join a Temporal Table. One is provided in Flink and the other is provided in Blink which has been pushed as a branch[1] in Flink repo.
>> 
>> - Join a Temporal Table in Flink[2][3][4]
>> As the document said: it is a join with a temporal table joins an append-only table (left input/probe side) with a temporal table (right input/build side), i.e., a table that changes over time and tracks its changes. You need to define a temporal table function and it will be used to provide access to the state of a temporal table at a specific point in time. *Both rowtime and proctime are supported.* 
>> - Join a Temporal Table in Blink[5]
>> Different from the join in Flink, it can join an *append/upsert/retract* stream (left input/probe side) with a temporal table (right input/build side), i.e., a *remote dimension table* that changes over time. In order to access data in a temporal table, you need to define a TableSource with LookupableTableSource[6](Probably you can download the code of blink and take a look at the `HBase143TableSource` which is an implementation of LookupableTableSource). Currently, only proctime is supported.
>> 
>> I think you can choose one according to your scenarios.
>> There are some useful examples in the document I list below. They may be very helpful for you. Feel free to ask if you have any other questions.
>> 
>> Best,
>> Hequn
>> 
>> [1] https://github.com/apache/flink/tree/blink <https://github.com/apache/flink/tree/blink>
>> [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table> 
>> [3] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html>
>> [4] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#joins <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#joins>
>> [5] https://flink-china.org/doc/blink/dev/table/streaming/joins.html#join-with-a-temporal-table <https://flink-china.org/doc/blink/dev/table/streaming/joins.html#join-with-a-temporal-table>
>> [6] https://flink-china.org/doc/blink/dev/table/sourcesinks#defining-a-tablesource-with-lookupable <https://flink-china.org/doc/blink/dev/table/sourcesinks#defining-a-tablesource-with-lookupable>
>> On Tue, Mar 12, 2019 at 2:13 PM 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> wrote:
>> Hi Hequn,
>> 	I want to implement stream join dimension in Flink SQL, I found there is a new feature named Temporal Tables delivered by Flink1.7, I think it maybe could be used to achieve the join between stream and dimension table. But I am not sure about that. Could anyone help me about it? 
>> 	Thanks a lot for your help.
>> 
>> Best 
>> Henry
>> 
>>> 在 2018年9月26日,上午12:16,Hequn Cheng <chenghequn@gmail.com <ma...@gmail.com>> 写道:
>>> 
>>> Hi vino,
>>> 
>>> Thanks for sharing the link. It's a great book and I will take a look. 
>>> There are kinds of join. Different joins have different semantics. From the link, I think it means the time versioned join.  FLINK-9712 <https://issues.apache.org/jira/browse/FLINK-9712> enrichments joins with Time Versioned Functions and the result is deterministic under eventime.  
>>> 
>>> Best, Hequn
>>> 
>>> On Tue, Sep 25, 2018 at 11:05 PM vino yang <yanghua1127@gmail.com <ma...@gmail.com>> wrote:
>>> Hi Hequn,
>>> 
>>> The specific content of the book does not give a right or wrong conclusion, but it illustrates this phenomenon: two streams of the same input, playing and joining at the same time, due to the order of events, the connection results are uncertain. This is because the two streams are intertwined in different forms. This has nothing to do with orderby, just that it exists in the stream stream join. Of course, this phenomenon is only a comparison statement with a non-stream join.
>>> 
>>> In addition, I recommend this book, which is very famous on Twitter and Amazon. Because you are also Chinese, there is a good translation here. If I guess it is correct, the main translator is also from your company. This part of what I mentioned is here.[1]
>>> 
>>> [1]: https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7 <https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7>
>>> 
>>> Thanks, vino.
>>> 
>>> Hequn Cheng <chenghequn@gmail.com <ma...@gmail.com>> 于2018年9月25日周二 下午9:45写道:
>>> Hi vino,
>>> 
>>> There are no order problems of stream-stream join in Flink. No matter what order the elements come, stream-stream join in Flink will output results which consistent with standard SQL semantics. I haven't read the book you mentioned. For join, it doesn't guarantee output orders. You have to do orderBy if you want to get ordered results.
>>> 
>>> Best, Hequn
>>> 
>>> On Tue, Sep 25, 2018 at 8:36 PM vino yang <yanghua1127@gmail.com <ma...@gmail.com>> wrote:
>>> Hi Fabian,
>>> 
>>> I may not have stated it here, and there is no semantic problem at the Flink implementation level. Rather, there may be “Time-dependence” here. [1]
>>> 
>>> Yes, my initial answer was not to use this form of join in this scenario, but Henry said he converted the table into a stream table and asked about the feasibility of other methods.
>>> 
>>> [1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part 3: Derived Data, Chapter 11: Stream Processing , Stream Joins.
>>> 
>>> some content :
>>> If the ordering of events across streams is undetermined, the join becomes nondeter‐ ministic [87], which means you cannot rerun the same job on the same input and necessarily get the same result: the events on the input streams may be interleaved in a different way when you run the job again. 
>>> 
>>> 
>>> 
>>> Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> 于2018年9月25日周二 下午8:08写道:
>>> Hi,
>>> 
>>> I don't think that using the current join implementation in the Table API / SQL will work.
>>> The non-windowed join fully materializes *both* input tables in state. This is necessary, because the join needs to be able to process updates on either side.
>>> While this is not a problem for the fixed sized MySQL table, materializing the append-only table (aka stream) is probably not what you want.
>>> You can also not limit idle state retention because it would remove the MySQL table from state at some point.
>>> 
>>> The only way to make it work is using a user-defined TableFunction that queries the MySQL table via JDBC. 
>>> However, please note that these calls would be synchronous, blocking calls.
>>> 
>>> @Vino: Why do you think that the stream & stream join is not mature and which problems do you see in the semantics? 
>>> The semantics are correct (standard SQL semantics) and in my opinion the implementation is also mature.
>>> However, you should not use the non-windowed join if any of the input tables is ever growing because both sides must be hold in state. This is not an issue of the semantics.
>>> 
>>> Cheers,
>>> Fabian
>>> 
>>> Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang <yanghua1127@gmail.com <ma...@gmail.com>>:
>>> Hi Henry,
>>> 
>>> 1) I don't recommend this method very much, but you said that you expect to convert mysql table to stream and then to flink table. Under this premise, I said that you can do this by joining two stream tables. But as you know, this join depends on the time period in which the state is saved. To make it equivalent to a dimension table, you must permanently save the state of the stream table that is defined as a "dimension table." I just said that modifying the relevant configuration in Flink can do this, Not for a single table.
>>> 
>>> 2) Imagine that there are one million records in two tables. The records in both tables are just beginning to stream into flink, and the records as dimension tables are not fully arrived. Therefore, your matching results may not be as accurate as directly querying Mysql.
>>> 
>>> In fact, the current stream & stream join is not very mature, there are some problems in semantics, I personally recommend that you return to stream/batch (mysql) join. For more principle content, I recommend you read a book, referred to as 《DDIA》.
>>> 
>>> Thanks, vino.
>>> 
>>> 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> 于2018年9月25日周二 下午5:48写道:
>>> Hi Vino,
>>> 	I do not quite understand in some sentences below, would you please help explain it a bit more detailedly?
>>> 	1. “such as setting the state retention time of one of the tables to be permanent” , as I know, the state retention time is a global config, I can not set this property per table.
>>> 	2. "you may not be able to match the results, because the data belonging to the mysql table is just beginning to play as a stream”  Why it is not able to match the results?
>>> 
>>> Best
>>> Henry
>>> 
>>>> 在 2018年9月25日,下午5:29,vino yang <yanghua1127@gmail.com <ma...@gmail.com>> 写道:
>>>> 
>>>> Hi Henry,
>>>> 
>>>> If you have converted the mysql table to a flink stream table. In flink table/sql, streams and stream joins can also do this, such as setting the state retention time of one of the tables to be permanent. But when the job is just running, you may not be able to match the results, because the data belonging to the mysql table is just beginning to play as a stream.
>>>> 
>>>> Thanks, vino.
>>>> 
>>>> 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> 于2018年9月25日周二 下午5:10写道:
>>>> Hi Vino & Hequn,
>>>> 	I am now using the table/sql API, if I import the mysql table as a stream then convert it into a table, it seems that it can also be a workaround for batch/streaming joining. May I ask what is the difference between the UDTF method? Does this implementation has some defects?
>>>> 	
>>>> Best
>>>> Henry
>>>> 
>>>>> 在 2018年9月22日,上午10:28,Hequn Cheng <chenghequn@gmail.com <ma...@gmail.com>> 写道:
>>>>> 
>>>>> Hi
>>>>> 
>>>>> +1 for vino's answer. 
>>>>> Also, this kind of join will be supported in FLINK-9712 <https://issues.apache.org/jira/browse/FLINK-9712>. You can check more details in the jira.
>>>>> 
>>>>> Best, Hequn
>>>>> 
>>>>> On Fri, Sep 21, 2018 at 4:51 PM vino yang <yanghua1127@gmail.com <ma...@gmail.com>> wrote:
>>>>> Hi Henry,
>>>>> 
>>>>> There are three ways I can think of:
>>>>> 
>>>>> 1) use DataStream API, implement a flatmap UDF to access dimension table;
>>>>> 2) use table/sql API, implement a UDTF to access dimension table;
>>>>> 3) customize the table/sql join API/statement's implementation (and change the physical plan)
>>>>> 
>>>>> Thanks, vino.
>>>>> 
>>>>> 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> 于2018年9月21日周五 下午4:43写道:
>>>>> Hi All,
>>>>>         Sometimes some “dimension table” need to be joined from the "fact table", if data are not joined before sent to Kafka.
>>>>>         So if the data are joined in Flink, does the “dimension table” have to be import as a stream, or there are some other ways can achieve it?
>>>>>         Thanks a lot!
>>>>> 
>>>>> Best
>>>>> Henry
>>>> 
>>> 
>> 
> 


Re: How to join stream and dimension data in Flink?

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Henry,

These are good questions!
I would rather not to add the temporal and lateral prefix in front of the
join. The temporal table is a concept orthogonal to join. We should say
join a temporal table or join a Lateral table.
1. You can of course use stream-stream join. Introducing the temporal table
not only makes our query more simple but also improves performance. More
detail can be found in [1].
2. Both two joins based on the concept of temporal table, i.e., a table
joins a temporal table.
3. Yes, actually the join in Flink uses a lateral
table&TemporalTableFunction to implement a temporal table. A temporal table
is a versioned table and a lateral table is a table keeps references to the
previous table. If you do not want to use time version, you don't need the
temporal table.
4. It is a kind of join. The join keyword can be omitted if it is an inner
join. The grammar will not be changed in the near future. I haven't heard
some news about changing it.
5. Yes, it will be optimized.
6. If you want to left join a temporal table. You can write sql like:

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  LEFT JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency

CC @Piotr Nowojski <pi...@data-artisans.com>  Would be great to have your
opinions here.

Best,
Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table


On Wed, Mar 13, 2019 at 1:59 PM 徐涛 <ha...@gmail.com> wrote:

> Hi Hequn,
> Thanks a lot for your answer! That is very helpful for me.
> I still have some questions about stream and dimension data join and
> temporal table join:
> 1. I found the temporal table join is still a one stream driven join, I do
> not know why the dimension data join has to be done by one stream driven
> join, why it can not be done by two stream join(traditional stream-stream
> join)?
> I try to give an answer about it: two stream join is based on the
> mechanism that is materialize two stream data in state, but the due to
> state retention, the dimension data may be lost. I guess this is one
> reason, am I correct?
> 2. Is Blink`s stream and dimension data join based on temporal table join?
>         3. I think lateral table join can also do dimension join if I do
> not want to use time versioning. How to choose between temporal table join
> and lateral table join?
> 4. I found that the temporal table join in Flink use a “LATERAL TABLE”
> grammar, but not “JOIN”, it is OK but not easier to use than “JOIN”, will
> the community modify the grammar in future releases?
> 5. In the following temporal table join statement, will the Orders table
> join Rates produce too many data before the where clause take effects? Will
> it be optimized?
>
> *SELECT*
> *  o.amount * r.rate AS amount*
> *FROM*
> *  Orders AS o,*
> *  LATERAL TABLE (Rates(o.rowtime)) AS r*
> *WHERE r.currency = o.currency *
>
> 6. How to use temporal table join to do left join?
>
>
> Best
> Henry
>
> 在 2019年3月13日,上午12:02,Hequn Cheng <ch...@gmail.com> 写道:
>
> Hi Henry,
>
> Yes, you are correct. Basically, there are two ways you can use to join a
> Temporal Table. One is provided in Flink and the other is provided in Blink
> which has been pushed as a branch[1] in Flink repo.
>
> - Join a Temporal Table in Flink[2][3][4]
> As the document said: it is a join with a temporal table joins an
> append-only table (left input/probe side) with a temporal table (right
> input/build side), i.e., a table that changes over time and tracks its
> changes. You need to define a temporal table function and it will be used
> to provide access to the state of a temporal table at a specific point in
> time. *Both rowtime and proctime are supported.*
> - Join a Temporal Table in Blink[5]
> Different from the join in Flink, it can join an *append/upsert/retract*
> stream (left input/probe side) with a temporal table (right input/build
> side), i.e., a *remote dimension table* that changes over time. In order to
> access data in a temporal table, you need to define a TableSource with
> LookupableTableSource[6](Probably you can download the code of blink and
> take a look at the `HBase143TableSource` which is an implementation of
> LookupableTableSource). Currently, only proctime is supported.
>
> I think you can choose one according to your scenarios.
> There are some useful examples in the document I list below. They may be
> very helpful for you. Feel free to ask if you have any other questions.
>
> Best,
> Hequn
>
> [1] https://github.com/apache/flink/tree/blink
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>
> [3]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html
> [4]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#joins
> [5]
> https://flink-china.org/doc/blink/dev/table/streaming/joins.html#join-with-a-temporal-table
> [6]
> https://flink-china.org/doc/blink/dev/table/sourcesinks#defining-a-tablesource-with-lookupable
>
> On Tue, Mar 12, 2019 at 2:13 PM 徐涛 <ha...@gmail.com> wrote:
>
>> Hi Hequn,
>> I want to implement stream join dimension in Flink SQL, I found there is
>> a new feature named Temporal Tables delivered by Flink1.7, I think it maybe
>> could be used to achieve the join between stream and dimension table. But I
>> am not sure about that. Could anyone help me about it?
>> Thanks a lot for your help.
>>
>> Best
>> Henry
>>
>> 在 2018年9月26日,上午12:16,Hequn Cheng <ch...@gmail.com> 写道:
>>
>> Hi vino,
>>
>> Thanks for sharing the link. It's a great book and I will take a look.
>> There are kinds of join. Different joins have different semantics. From
>> the link, I think it means the time versioned join.  FLINK-9712
>> <https://issues.apache.org/jira/browse/FLINK-9712> enrichments joins
>> with Time Versioned Functions and the result is deterministic under
>> eventime.
>>
>> Best, Hequn
>>
>> On Tue, Sep 25, 2018 at 11:05 PM vino yang <ya...@gmail.com> wrote:
>>
>>> Hi Hequn,
>>>
>>> The specific content of the book does not give a right or wrong
>>> conclusion, but it illustrates this phenomenon: two streams of the same
>>> input, playing and joining at the same time, due to the order of events,
>>> the connection results are uncertain. This is because the two streams are
>>> intertwined in different forms. This has nothing to do with orderby, just
>>> that it exists in the stream stream join. Of course, this phenomenon is
>>> only a comparison statement with a non-stream join.
>>>
>>> In addition, I recommend this book, which is very famous on Twitter and
>>> Amazon. Because you are also Chinese, there is a good translation here. If
>>> I guess it is correct, the main translator is also from your company. This
>>> part of what I mentioned is here.[1]
>>>
>>> [1]:
>>> https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7
>>>
>>> Thanks, vino.
>>>
>>> Hequn Cheng <ch...@gmail.com> 于2018年9月25日周二 下午9:45写道:
>>>
>>>> Hi vino,
>>>>
>>>> There are no order problems of stream-stream join in Flink. No matter
>>>> what order the elements come, stream-stream join in Flink will output
>>>> results which consistent with standard SQL semantics. I haven't read the
>>>> book you mentioned. For join, it doesn't guarantee output orders. You have
>>>> to do orderBy if you want to get ordered results.
>>>>
>>>> Best, Hequn
>>>>
>>>> On Tue, Sep 25, 2018 at 8:36 PM vino yang <ya...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> I may not have stated it here, and there is no semantic problem at the
>>>>> Flink implementation level. Rather, there may be “Time-dependence” here. [1]
>>>>>
>>>>> Yes, my initial answer was not to use this form of join in this
>>>>> scenario, but Henry said he converted the table into a stream table and
>>>>> asked about the feasibility of other methods.
>>>>>
>>>>> [1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part
>>>>> 3: Derived Data, Chapter 11: Stream Processing , Stream Joins.
>>>>>
>>>>> some content :
>>>>>
>>>>> *If the ordering of events across streams is undetermined, the join
>>>>> becomes nondeter‐ ministic [87], which means you cannot rerun the same job
>>>>> on the same input and necessarily get the same result: the events on the
>>>>> input streams may be interleaved in a different way when you run the job
>>>>> again. *
>>>>>
>>>>>
>>>>> Fabian Hueske <fh...@gmail.com> 于2018年9月25日周二 下午8:08写道:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I don't think that using the current join implementation in the Table
>>>>>> API / SQL will work.
>>>>>> The non-windowed join fully materializes *both* input tables in
>>>>>> state. This is necessary, because the join needs to be able to process
>>>>>> updates on either side.
>>>>>> While this is not a problem for the fixed sized MySQL table,
>>>>>> materializing the append-only table (aka stream) is probably not what you
>>>>>> want.
>>>>>> You can also not limit idle state retention because it would remove
>>>>>> the MySQL table from state at some point.
>>>>>>
>>>>>> The only way to make it work is using a user-defined TableFunction
>>>>>> that queries the MySQL table via JDBC.
>>>>>> However, please note that these calls would be synchronous, blocking
>>>>>> calls.
>>>>>>
>>>>>> @Vino: Why do you think that the stream & stream join is not mature
>>>>>> and which problems do you see in the semantics?
>>>>>> The semantics are correct (standard SQL semantics) and in my opinion
>>>>>> the implementation is also mature.
>>>>>> However, you should not use the non-windowed join if any of the input
>>>>>> tables is ever growing because both sides must be hold in state. This is
>>>>>> not an issue of the semantics.
>>>>>>
>>>>>> Cheers,
>>>>>> Fabian
>>>>>>
>>>>>> Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang <
>>>>>> yanghua1127@gmail.com>:
>>>>>>
>>>>>>> Hi Henry,
>>>>>>>
>>>>>>> 1) I don't recommend this method very much, but you said that you
>>>>>>> expect to convert mysql table to stream and then to flink table. Under this
>>>>>>> premise, I said that you can do this by joining two stream tables. But as
>>>>>>> you know, this join depends on the time period in which the state is saved.
>>>>>>> To make it equivalent to a dimension table, you must permanently save the
>>>>>>> state of the stream table that is defined as a "dimension table." I just
>>>>>>> said that modifying the relevant configuration in Flink can do this, Not
>>>>>>> for a single table.
>>>>>>>
>>>>>>> 2) Imagine that there are one million records in two tables. The
>>>>>>> records in both tables are just beginning to stream into flink, and the
>>>>>>> records as dimension tables are not fully arrived. Therefore, your matching
>>>>>>> results may not be as accurate as directly querying Mysql.
>>>>>>>
>>>>>>> In fact, the current stream & stream join is not very mature, there
>>>>>>> are some problems in semantics, I personally recommend that you return to
>>>>>>> stream/batch (mysql) join. For more principle content, I recommend you read
>>>>>>> a book, referred to as 《DDIA》.
>>>>>>>
>>>>>>> Thanks, vino.
>>>>>>>
>>>>>>> 徐涛 <ha...@gmail.com> 于2018年9月25日周二 下午5:48写道:
>>>>>>>
>>>>>>>> Hi Vino,
>>>>>>>> I do not quite understand in some sentences below, would you please
>>>>>>>> help explain it a bit more detailedly?
>>>>>>>> 1. “*such as setting the state retention time of one of the tables
>>>>>>>> to be permanent*” , as I know, the state retention time is a
>>>>>>>> global config, I can not set this property per table.
>>>>>>>> 2. "*you may not be able to match the results, because the data
>>>>>>>> belonging to the mysql table is just beginning to play as a stream*”
>>>>>>>>  Why it is not able to match the results?
>>>>>>>>
>>>>>>>> Best
>>>>>>>> Henry
>>>>>>>>
>>>>>>>> 在 2018年9月25日,下午5:29,vino yang <ya...@gmail.com> 写道:
>>>>>>>>
>>>>>>>> Hi Henry,
>>>>>>>>
>>>>>>>> If you have converted the mysql table to a flink stream table. In
>>>>>>>> flink table/sql, streams and stream joins can also do this, such as setting
>>>>>>>> the state retention time of one of the tables to be permanent. But when the
>>>>>>>> job is just running, you may not be able to match the results, because the
>>>>>>>> data belonging to the mysql table is just beginning to play as a stream.
>>>>>>>>
>>>>>>>> Thanks, vino.
>>>>>>>>
>>>>>>>> 徐涛 <ha...@gmail.com> 于2018年9月25日周二 下午5:10写道:
>>>>>>>>
>>>>>>>>> Hi Vino & Hequn,
>>>>>>>>> I am now using the table/sql API, if I import the mysql table as a
>>>>>>>>> stream then convert it into a table, it seems that it can also be a
>>>>>>>>> workaround for batch/streaming joining. May I ask what is the difference
>>>>>>>>> between the UDTF method? Does this implementation has some defects?
>>>>>>>>> Best
>>>>>>>>> Henry
>>>>>>>>>
>>>>>>>>> 在 2018年9月22日,上午10:28,Hequn Cheng <ch...@gmail.com> 写道:
>>>>>>>>>
>>>>>>>>> Hi
>>>>>>>>>
>>>>>>>>> +1 for vino's answer.
>>>>>>>>> Also, this kind of join will be supported in FLINK-9712
>>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-9712>. You can check
>>>>>>>>> more details in the jira.
>>>>>>>>>
>>>>>>>>> Best, Hequn
>>>>>>>>>
>>>>>>>>> On Fri, Sep 21, 2018 at 4:51 PM vino yang <ya...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Henry,
>>>>>>>>>>
>>>>>>>>>> There are three ways I can think of:
>>>>>>>>>>
>>>>>>>>>> 1) use DataStream API, implement a flatmap UDF to access
>>>>>>>>>> dimension table;
>>>>>>>>>> 2) use table/sql API, implement a UDTF to access dimension table;
>>>>>>>>>> 3) customize the table/sql join API/statement's implementation
>>>>>>>>>> (and change the physical plan)
>>>>>>>>>>
>>>>>>>>>> Thanks, vino.
>>>>>>>>>>
>>>>>>>>>> 徐涛 <ha...@gmail.com> 于2018年9月21日周五 下午4:43写道:
>>>>>>>>>>
>>>>>>>>>>> Hi All,
>>>>>>>>>>>         Sometimes some “dimension table” need to be joined from
>>>>>>>>>>> the "fact table", if data are not joined before sent to Kafka.
>>>>>>>>>>>         So if the data are joined in Flink, does the “dimension
>>>>>>>>>>> table” have to be import as a stream, or there are some other ways can
>>>>>>>>>>> achieve it?
>>>>>>>>>>>         Thanks a lot!
>>>>>>>>>>>
>>>>>>>>>>> Best
>>>>>>>>>>> Henry
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>
>

Re: How to join stream and dimension data in Flink?

Posted by 徐涛 <ha...@gmail.com>.
Hi Hequn,
	Thanks a lot for your answer! That is very helpful for me.
	I still have some questions about stream and dimension data join and temporal table join:  
	1. I found the temporal table join is still a one stream driven join, I do not know why the dimension data join has to be done by one stream driven join, why it can not be done by two stream join(traditional stream-stream join)?
		I try to give an answer about it: two stream join is based on the mechanism that is materialize two stream data in state, but the due to state retention, the dimension data may be lost. I guess this is one reason, am I correct?
	2. Is Blink`s stream and dimension data join based on temporal table join? 
        3. I think lateral table join can also do dimension join if I do not want to use time versioning. How to choose between temporal table join and lateral table join?
	4. I found that the temporal table join in Flink use a “LATERAL TABLE” grammar, but not “JOIN”, it is OK but not easier to use than “JOIN”, will the community modify the grammar in future releases?
	5. In the following temporal table join statement, will the Orders table join Rates produce too many data before the where clause take effects? Will it be optimized?
SELECT
  o.amount * r.rate AS amount
FROM
  Orders AS o,
  LATERAL TABLE (Rates(o.rowtime)) AS r
WHERE r.currency = o.currency 
	6. How to use temporal table join to do left join?


Best
Henry

> 在 2019年3月13日,上午12:02,Hequn Cheng <ch...@gmail.com> 写道:
> 
> Hi Henry,
> 
> Yes, you are correct. Basically, there are two ways you can use to join a Temporal Table. One is provided in Flink and the other is provided in Blink which has been pushed as a branch[1] in Flink repo.
> 
> - Join a Temporal Table in Flink[2][3][4]
> As the document said: it is a join with a temporal table joins an append-only table (left input/probe side) with a temporal table (right input/build side), i.e., a table that changes over time and tracks its changes. You need to define a temporal table function and it will be used to provide access to the state of a temporal table at a specific point in time. *Both rowtime and proctime are supported.* 
> - Join a Temporal Table in Blink[5]
> Different from the join in Flink, it can join an *append/upsert/retract* stream (left input/probe side) with a temporal table (right input/build side), i.e., a *remote dimension table* that changes over time. In order to access data in a temporal table, you need to define a TableSource with LookupableTableSource[6](Probably you can download the code of blink and take a look at the `HBase143TableSource` which is an implementation of LookupableTableSource). Currently, only proctime is supported.
> 
> I think you can choose one according to your scenarios.
> There are some useful examples in the document I list below. They may be very helpful for you. Feel free to ask if you have any other questions.
> 
> Best,
> Hequn
> 
> [1] https://github.com/apache/flink/tree/blink <https://github.com/apache/flink/tree/blink>
> [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table> 
> [3] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html>
> [4] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#joins <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#joins>
> [5] https://flink-china.org/doc/blink/dev/table/streaming/joins.html#join-with-a-temporal-table <https://flink-china.org/doc/blink/dev/table/streaming/joins.html#join-with-a-temporal-table>
> [6] https://flink-china.org/doc/blink/dev/table/sourcesinks#defining-a-tablesource-with-lookupable <https://flink-china.org/doc/blink/dev/table/sourcesinks#defining-a-tablesource-with-lookupable>
> On Tue, Mar 12, 2019 at 2:13 PM 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> wrote:
> Hi Hequn,
> 	I want to implement stream join dimension in Flink SQL, I found there is a new feature named Temporal Tables delivered by Flink1.7, I think it maybe could be used to achieve the join between stream and dimension table. But I am not sure about that. Could anyone help me about it? 
> 	Thanks a lot for your help.
> 
> Best 
> Henry
> 
>> 在 2018年9月26日,上午12:16,Hequn Cheng <chenghequn@gmail.com <ma...@gmail.com>> 写道:
>> 
>> Hi vino,
>> 
>> Thanks for sharing the link. It's a great book and I will take a look. 
>> There are kinds of join. Different joins have different semantics. From the link, I think it means the time versioned join.  FLINK-9712 <https://issues.apache.org/jira/browse/FLINK-9712> enrichments joins with Time Versioned Functions and the result is deterministic under eventime.  
>> 
>> Best, Hequn
>> 
>> On Tue, Sep 25, 2018 at 11:05 PM vino yang <yanghua1127@gmail.com <ma...@gmail.com>> wrote:
>> Hi Hequn,
>> 
>> The specific content of the book does not give a right or wrong conclusion, but it illustrates this phenomenon: two streams of the same input, playing and joining at the same time, due to the order of events, the connection results are uncertain. This is because the two streams are intertwined in different forms. This has nothing to do with orderby, just that it exists in the stream stream join. Of course, this phenomenon is only a comparison statement with a non-stream join.
>> 
>> In addition, I recommend this book, which is very famous on Twitter and Amazon. Because you are also Chinese, there is a good translation here. If I guess it is correct, the main translator is also from your company. This part of what I mentioned is here.[1]
>> 
>> [1]: https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7 <https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7>
>> 
>> Thanks, vino.
>> 
>> Hequn Cheng <chenghequn@gmail.com <ma...@gmail.com>> 于2018年9月25日周二 下午9:45写道:
>> Hi vino,
>> 
>> There are no order problems of stream-stream join in Flink. No matter what order the elements come, stream-stream join in Flink will output results which consistent with standard SQL semantics. I haven't read the book you mentioned. For join, it doesn't guarantee output orders. You have to do orderBy if you want to get ordered results.
>> 
>> Best, Hequn
>> 
>> On Tue, Sep 25, 2018 at 8:36 PM vino yang <yanghua1127@gmail.com <ma...@gmail.com>> wrote:
>> Hi Fabian,
>> 
>> I may not have stated it here, and there is no semantic problem at the Flink implementation level. Rather, there may be “Time-dependence” here. [1]
>> 
>> Yes, my initial answer was not to use this form of join in this scenario, but Henry said he converted the table into a stream table and asked about the feasibility of other methods.
>> 
>> [1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part 3: Derived Data, Chapter 11: Stream Processing , Stream Joins.
>> 
>> some content :
>> If the ordering of events across streams is undetermined, the join becomes nondeter‐ ministic [87], which means you cannot rerun the same job on the same input and necessarily get the same result: the events on the input streams may be interleaved in a different way when you run the job again. 
>> 
>> 
>> 
>> Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> 于2018年9月25日周二 下午8:08写道:
>> Hi,
>> 
>> I don't think that using the current join implementation in the Table API / SQL will work.
>> The non-windowed join fully materializes *both* input tables in state. This is necessary, because the join needs to be able to process updates on either side.
>> While this is not a problem for the fixed sized MySQL table, materializing the append-only table (aka stream) is probably not what you want.
>> You can also not limit idle state retention because it would remove the MySQL table from state at some point.
>> 
>> The only way to make it work is using a user-defined TableFunction that queries the MySQL table via JDBC. 
>> However, please note that these calls would be synchronous, blocking calls.
>> 
>> @Vino: Why do you think that the stream & stream join is not mature and which problems do you see in the semantics? 
>> The semantics are correct (standard SQL semantics) and in my opinion the implementation is also mature.
>> However, you should not use the non-windowed join if any of the input tables is ever growing because both sides must be hold in state. This is not an issue of the semantics.
>> 
>> Cheers,
>> Fabian
>> 
>> Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang <yanghua1127@gmail.com <ma...@gmail.com>>:
>> Hi Henry,
>> 
>> 1) I don't recommend this method very much, but you said that you expect to convert mysql table to stream and then to flink table. Under this premise, I said that you can do this by joining two stream tables. But as you know, this join depends on the time period in which the state is saved. To make it equivalent to a dimension table, you must permanently save the state of the stream table that is defined as a "dimension table." I just said that modifying the relevant configuration in Flink can do this, Not for a single table.
>> 
>> 2) Imagine that there are one million records in two tables. The records in both tables are just beginning to stream into flink, and the records as dimension tables are not fully arrived. Therefore, your matching results may not be as accurate as directly querying Mysql.
>> 
>> In fact, the current stream & stream join is not very mature, there are some problems in semantics, I personally recommend that you return to stream/batch (mysql) join. For more principle content, I recommend you read a book, referred to as 《DDIA》.
>> 
>> Thanks, vino.
>> 
>> 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> 于2018年9月25日周二 下午5:48写道:
>> Hi Vino,
>> 	I do not quite understand in some sentences below, would you please help explain it a bit more detailedly?
>> 	1. “such as setting the state retention time of one of the tables to be permanent” , as I know, the state retention time is a global config, I can not set this property per table.
>> 	2. "you may not be able to match the results, because the data belonging to the mysql table is just beginning to play as a stream”  Why it is not able to match the results?
>> 
>> Best
>> Henry
>> 
>>> 在 2018年9月25日,下午5:29,vino yang <yanghua1127@gmail.com <ma...@gmail.com>> 写道:
>>> 
>>> Hi Henry,
>>> 
>>> If you have converted the mysql table to a flink stream table. In flink table/sql, streams and stream joins can also do this, such as setting the state retention time of one of the tables to be permanent. But when the job is just running, you may not be able to match the results, because the data belonging to the mysql table is just beginning to play as a stream.
>>> 
>>> Thanks, vino.
>>> 
>>> 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> 于2018年9月25日周二 下午5:10写道:
>>> Hi Vino & Hequn,
>>> 	I am now using the table/sql API, if I import the mysql table as a stream then convert it into a table, it seems that it can also be a workaround for batch/streaming joining. May I ask what is the difference between the UDTF method? Does this implementation has some defects?
>>> 	
>>> Best
>>> Henry
>>> 
>>>> 在 2018年9月22日,上午10:28,Hequn Cheng <chenghequn@gmail.com <ma...@gmail.com>> 写道:
>>>> 
>>>> Hi
>>>> 
>>>> +1 for vino's answer. 
>>>> Also, this kind of join will be supported in FLINK-9712 <https://issues.apache.org/jira/browse/FLINK-9712>. You can check more details in the jira.
>>>> 
>>>> Best, Hequn
>>>> 
>>>> On Fri, Sep 21, 2018 at 4:51 PM vino yang <yanghua1127@gmail.com <ma...@gmail.com>> wrote:
>>>> Hi Henry,
>>>> 
>>>> There are three ways I can think of:
>>>> 
>>>> 1) use DataStream API, implement a flatmap UDF to access dimension table;
>>>> 2) use table/sql API, implement a UDTF to access dimension table;
>>>> 3) customize the table/sql join API/statement's implementation (and change the physical plan)
>>>> 
>>>> Thanks, vino.
>>>> 
>>>> 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> 于2018年9月21日周五 下午4:43写道:
>>>> Hi All,
>>>>         Sometimes some “dimension table” need to be joined from the "fact table", if data are not joined before sent to Kafka.
>>>>         So if the data are joined in Flink, does the “dimension table” have to be import as a stream, or there are some other ways can achieve it?
>>>>         Thanks a lot!
>>>> 
>>>> Best
>>>> Henry
>>> 
>> 
> 


Re: How to join stream and dimension data in Flink?

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Henry,

Yes, you are correct. Basically, there are two ways you can use to join a
Temporal Table. One is provided in Flink and the other is provided in Blink
which has been pushed as a branch[1] in Flink repo.

- Join a Temporal Table in Flink[2][3][4]
As the document said: it is a join with a temporal table joins an
append-only table (left input/probe side) with a temporal table (right
input/build side), i.e., a table that changes over time and tracks its
changes. You need to define a temporal table function and it will be used
to provide access to the state of a temporal table at a specific point in
time. *Both rowtime and proctime are supported.*
- Join a Temporal Table in Blink[5]
Different from the join in Flink, it can join an *append/upsert/retract*
stream (left input/probe side) with a temporal table (right input/build
side), i.e., a *remote dimension table* that changes over time. In order to
access data in a temporal table, you need to define a TableSource with
LookupableTableSource[6](Probably you can download the code of blink and
take a look at the `HBase143TableSource` which is an implementation of
LookupableTableSource). Currently, only proctime is supported.

I think you can choose one according to your scenarios.
There are some useful examples in the document I list below. They may be
very helpful for you. Feel free to ask if you have any other questions.

Best,
Hequn

[1] https://github.com/apache/flink/tree/blink
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table

[3]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html
[4]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#joins
[5]
https://flink-china.org/doc/blink/dev/table/streaming/joins.html#join-with-a-temporal-table
[6]
https://flink-china.org/doc/blink/dev/table/sourcesinks#defining-a-tablesource-with-lookupable

On Tue, Mar 12, 2019 at 2:13 PM 徐涛 <ha...@gmail.com> wrote:

> Hi Hequn,
> I want to implement stream join dimension in Flink SQL, I found there is a
> new feature named Temporal Tables delivered by Flink1.7, I think it maybe
> could be used to achieve the join between stream and dimension table. But I
> am not sure about that. Could anyone help me about it?
> Thanks a lot for your help.
>
> Best
> Henry
>
> 在 2018年9月26日,上午12:16,Hequn Cheng <ch...@gmail.com> 写道:
>
> Hi vino,
>
> Thanks for sharing the link. It's a great book and I will take a look.
> There are kinds of join. Different joins have different semantics. From
> the link, I think it means the time versioned join.  FLINK-9712
> <https://issues.apache.org/jira/browse/FLINK-9712> enrichments joins with
> Time Versioned Functions and the result is deterministic under eventime.
>
> Best, Hequn
>
> On Tue, Sep 25, 2018 at 11:05 PM vino yang <ya...@gmail.com> wrote:
>
>> Hi Hequn,
>>
>> The specific content of the book does not give a right or wrong
>> conclusion, but it illustrates this phenomenon: two streams of the same
>> input, playing and joining at the same time, due to the order of events,
>> the connection results are uncertain. This is because the two streams are
>> intertwined in different forms. This has nothing to do with orderby, just
>> that it exists in the stream stream join. Of course, this phenomenon is
>> only a comparison statement with a non-stream join.
>>
>> In addition, I recommend this book, which is very famous on Twitter and
>> Amazon. Because you are also Chinese, there is a good translation here. If
>> I guess it is correct, the main translator is also from your company. This
>> part of what I mentioned is here.[1]
>>
>> [1]:
>> https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7
>>
>> Thanks, vino.
>>
>> Hequn Cheng <ch...@gmail.com> 于2018年9月25日周二 下午9:45写道:
>>
>>> Hi vino,
>>>
>>> There are no order problems of stream-stream join in Flink. No matter
>>> what order the elements come, stream-stream join in Flink will output
>>> results which consistent with standard SQL semantics. I haven't read the
>>> book you mentioned. For join, it doesn't guarantee output orders. You have
>>> to do orderBy if you want to get ordered results.
>>>
>>> Best, Hequn
>>>
>>> On Tue, Sep 25, 2018 at 8:36 PM vino yang <ya...@gmail.com> wrote:
>>>
>>>> Hi Fabian,
>>>>
>>>> I may not have stated it here, and there is no semantic problem at the
>>>> Flink implementation level. Rather, there may be “Time-dependence” here. [1]
>>>>
>>>> Yes, my initial answer was not to use this form of join in this
>>>> scenario, but Henry said he converted the table into a stream table and
>>>> asked about the feasibility of other methods.
>>>>
>>>> [1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part
>>>> 3: Derived Data, Chapter 11: Stream Processing , Stream Joins.
>>>>
>>>> some content :
>>>>
>>>> *If the ordering of events across streams is undetermined, the join
>>>> becomes nondeter‐ ministic [87], which means you cannot rerun the same job
>>>> on the same input and necessarily get the same result: the events on the
>>>> input streams may be interleaved in a different way when you run the job
>>>> again. *
>>>>
>>>>
>>>> Fabian Hueske <fh...@gmail.com> 于2018年9月25日周二 下午8:08写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> I don't think that using the current join implementation in the Table
>>>>> API / SQL will work.
>>>>> The non-windowed join fully materializes *both* input tables in state.
>>>>> This is necessary, because the join needs to be able to process updates on
>>>>> either side.
>>>>> While this is not a problem for the fixed sized MySQL table,
>>>>> materializing the append-only table (aka stream) is probably not what you
>>>>> want.
>>>>> You can also not limit idle state retention because it would remove
>>>>> the MySQL table from state at some point.
>>>>>
>>>>> The only way to make it work is using a user-defined TableFunction
>>>>> that queries the MySQL table via JDBC.
>>>>> However, please note that these calls would be synchronous, blocking
>>>>> calls.
>>>>>
>>>>> @Vino: Why do you think that the stream & stream join is not mature
>>>>> and which problems do you see in the semantics?
>>>>> The semantics are correct (standard SQL semantics) and in my opinion
>>>>> the implementation is also mature.
>>>>> However, you should not use the non-windowed join if any of the input
>>>>> tables is ever growing because both sides must be hold in state. This is
>>>>> not an issue of the semantics.
>>>>>
>>>>> Cheers,
>>>>> Fabian
>>>>>
>>>>> Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang <
>>>>> yanghua1127@gmail.com>:
>>>>>
>>>>>> Hi Henry,
>>>>>>
>>>>>> 1) I don't recommend this method very much, but you said that you
>>>>>> expect to convert mysql table to stream and then to flink table. Under this
>>>>>> premise, I said that you can do this by joining two stream tables. But as
>>>>>> you know, this join depends on the time period in which the state is saved.
>>>>>> To make it equivalent to a dimension table, you must permanently save the
>>>>>> state of the stream table that is defined as a "dimension table." I just
>>>>>> said that modifying the relevant configuration in Flink can do this, Not
>>>>>> for a single table.
>>>>>>
>>>>>> 2) Imagine that there are one million records in two tables. The
>>>>>> records in both tables are just beginning to stream into flink, and the
>>>>>> records as dimension tables are not fully arrived. Therefore, your matching
>>>>>> results may not be as accurate as directly querying Mysql.
>>>>>>
>>>>>> In fact, the current stream & stream join is not very mature, there
>>>>>> are some problems in semantics, I personally recommend that you return to
>>>>>> stream/batch (mysql) join. For more principle content, I recommend you read
>>>>>> a book, referred to as 《DDIA》.
>>>>>>
>>>>>> Thanks, vino.
>>>>>>
>>>>>> 徐涛 <ha...@gmail.com> 于2018年9月25日周二 下午5:48写道:
>>>>>>
>>>>>>> Hi Vino,
>>>>>>> I do not quite understand in some sentences below, would you please
>>>>>>> help explain it a bit more detailedly?
>>>>>>> 1. “*such as setting the state retention time of one of the tables
>>>>>>> to be permanent*” , as I know, the state retention time is a global
>>>>>>> config, I can not set this property per table.
>>>>>>> 2. "*you may not be able to match the results, because the data
>>>>>>> belonging to the mysql table is just beginning to play as a stream*”
>>>>>>>  Why it is not able to match the results?
>>>>>>>
>>>>>>> Best
>>>>>>> Henry
>>>>>>>
>>>>>>> 在 2018年9月25日,下午5:29,vino yang <ya...@gmail.com> 写道:
>>>>>>>
>>>>>>> Hi Henry,
>>>>>>>
>>>>>>> If you have converted the mysql table to a flink stream table. In
>>>>>>> flink table/sql, streams and stream joins can also do this, such as setting
>>>>>>> the state retention time of one of the tables to be permanent. But when the
>>>>>>> job is just running, you may not be able to match the results, because the
>>>>>>> data belonging to the mysql table is just beginning to play as a stream.
>>>>>>>
>>>>>>> Thanks, vino.
>>>>>>>
>>>>>>> 徐涛 <ha...@gmail.com> 于2018年9月25日周二 下午5:10写道:
>>>>>>>
>>>>>>>> Hi Vino & Hequn,
>>>>>>>> I am now using the table/sql API, if I import the mysql table as a
>>>>>>>> stream then convert it into a table, it seems that it can also be a
>>>>>>>> workaround for batch/streaming joining. May I ask what is the difference
>>>>>>>> between the UDTF method? Does this implementation has some defects?
>>>>>>>> Best
>>>>>>>> Henry
>>>>>>>>
>>>>>>>> 在 2018年9月22日,上午10:28,Hequn Cheng <ch...@gmail.com> 写道:
>>>>>>>>
>>>>>>>> Hi
>>>>>>>>
>>>>>>>> +1 for vino's answer.
>>>>>>>> Also, this kind of join will be supported in FLINK-9712
>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-9712>. You can check
>>>>>>>> more details in the jira.
>>>>>>>>
>>>>>>>> Best, Hequn
>>>>>>>>
>>>>>>>> On Fri, Sep 21, 2018 at 4:51 PM vino yang <ya...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Henry,
>>>>>>>>>
>>>>>>>>> There are three ways I can think of:
>>>>>>>>>
>>>>>>>>> 1) use DataStream API, implement a flatmap UDF to access dimension
>>>>>>>>> table;
>>>>>>>>> 2) use table/sql API, implement a UDTF to access dimension table;
>>>>>>>>> 3) customize the table/sql join API/statement's implementation
>>>>>>>>> (and change the physical plan)
>>>>>>>>>
>>>>>>>>> Thanks, vino.
>>>>>>>>>
>>>>>>>>> 徐涛 <ha...@gmail.com> 于2018年9月21日周五 下午4:43写道:
>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>         Sometimes some “dimension table” need to be joined from
>>>>>>>>>> the "fact table", if data are not joined before sent to Kafka.
>>>>>>>>>>         So if the data are joined in Flink, does the “dimension
>>>>>>>>>> table” have to be import as a stream, or there are some other ways can
>>>>>>>>>> achieve it?
>>>>>>>>>>         Thanks a lot!
>>>>>>>>>>
>>>>>>>>>> Best
>>>>>>>>>> Henry
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>