You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 徐涛 <ha...@gmail.com> on 2018/08/10 15:06:55 UTC

flink requires table key when insert into upsert table sink

Hi All,
	I am using flink 1.6 to generate some realtime programs. I want to write the output to table sink, the code is as below. At first I use append table sink, which error message tells me that I should use upsert table sink, so I write one. But still another error “Caused by: org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.” comes out,which blocks me. My questions is how to modify a table keys in this scenario? I also check the exception stack, and found that the system infer the keys field by 
val tableKeys: Option[Array[String]] = UpdatingPlanChecker.getUniqueKeyFields(optimizedPlan), I wonder how to make the function return value ?
Thanks a lot !!!
    var praise = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM praise GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
    tableEnv.registerTable("praiseAggr", praise)

    var comment = tableEnv.sqlQuery(s"SELECT article_id,hll(from_uid) as CU FROM comment GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
    tableEnv.registerTable("commentAggr", comment)

    var reader = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as RU FROM reader GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
    tableEnv.registerTable("readerAggr", reader)

    var finalTable = tableEnv.sqlUpdate(s"insert into "+ sinkTableName + " " +  " SELECT p.article_id,p.PU,c.CU,r.RU from praiseAggr p FULL OUTER JOIN commentAggr c on p.article_id=c.article_id FULL OUTER JOIN readerAggr r on c.article_id=r.article_id")

	

	

Thank,
Henry Xu

Re: flink requires table key when insert into upsert table sink

Posted by Hequn Cheng <ch...@gmail.com>.
Hi,

*> Could you give an example that the query has a unique key?*

Consider the following sql:

SELECT a, SUM(b) as d
> FROM Orders
> GROUP BY a


The result table contains unique key of a.  A document about Streaming
Concepts[1] may be helpful for you.

*> What is the mechanism flink infer which field is the unique key(s)?*

Currently(flink-1.6.0), flink sql generate unique keys only from group by
and the unique keys info can be passed to the downstream operators, for
example the SELECT.

*Implement a RetractStreamTableSink*

Since outer joins output update data without unique keys, you can use a
RetractTableSink to output data. There are some documents about implement a
table sink[2].

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#streaming-concepts
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink


On Sat, Aug 11, 2018 at 6:02 AM, 徐涛 <ha...@gmail.com> wrote:

> Hi Fabian,
> Could you give an example that the query has a unique key?
> What is the mechanism flink infer which field is the unique key(s)?
>         Thanks a lot!
>
> Best, Henry
>
>
> 在 2018年8月11日,上午5:21,Fabian Hueske <fh...@gmail.com> 写道:
>
> Hi Henry,
>
> The problem is that the table that results from the query does not have a
> unique key.
> You can only use an upsert sink if the table has a (composite) unique key.
> Since this is not the case, you cannot use upsert sink.
> However, you can implement a StreamRetractionTableSink which allows to
> write any kind of Table (append-only/update, keyed/non-keyed) to an
> external system.
>
> Best, Fabian
>
> 2018-08-10 17:06 GMT+02:00 徐涛 <ha...@gmail.com>:
>
>> Hi All,
>> I am using flink 1.6 to generate some realtime programs. I want to write
>> the output to table sink, the code is as below. At first I use append table
>> sink, which error message tells me that I should use upsert table sink, so
>> I write one. But still another error “Caused by:
>> org.apache.flink.table.api.TableException: UpsertStreamTableSink
>> requires that Table has a full primary keys if it is updated.” comes
>> out,which blocks me. My questions is how to modify a table keys in this
>> scenario? I also check the exception stack, and found that the system infer
>> the keys field by
>> val tableKeys: Option[Array[String]] = UpdatingPlanChecker.getUniqueKeyFields(optimizedPlan),
>> I wonder how to make the function return value ?
>> Thanks a lot !!!
>>
>>     var praise = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM praise GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
>>     tableEnv.registerTable("praiseAggr", praise)
>>
>>     var comment = tableEnv.sqlQuery(s"SELECT article_id,hll(from_uid) as CU FROM comment GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
>>     tableEnv.registerTable("commentAggr", comment)
>>
>>     var reader = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as RU FROM reader GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
>>     tableEnv.registerTable("readerAggr", reader)
>>
>>     var finalTable = tableEnv.sqlUpdate(s"insert into "+ sinkTableName + " " +  " SELECT p.article_id,p.PU,c.CU,r.RU from praiseAggr p FULL OUTER JOIN commentAggr c on p.article_id=c.article_id FULL OUTER JOIN readerAggr r on c.article_id=r.article_id")
>>
>>
>>
>>
>> Thank,
>> Henry Xu
>>
>
>
>

Re: flink requires table key when insert into upsert table sink

Posted by 徐涛 <ha...@gmail.com>.
Hi Fabian,
	Could you give an example that the query has a unique key?
	What is the mechanism flink infer which field is the unique key(s)?
        Thanks a lot!

Best, Henry

> 在 2018年8月11日,上午5:21,Fabian Hueske <fh...@gmail.com> 写道:
> 
> Hi Henry,
> 
> The problem is that the table that results from the query does not have a unique key. 
> You can only use an upsert sink if the table has a (composite) unique key. Since this is not the case, you cannot use upsert sink.
> However, you can implement a StreamRetractionTableSink which allows to write any kind of Table (append-only/update, keyed/non-keyed) to an external system.
> 
> Best, Fabian
> 
> 2018-08-10 17:06 GMT+02:00 徐涛 <happydexutao@gmail.com <ma...@gmail.com>>:
> Hi All,
> 	I am using flink 1.6 to generate some realtime programs. I want to write the output to table sink, the code is as below. At first I use append table sink, which error message tells me that I should use upsert table sink, so I write one. But still another error “Caused by: org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.” comes out,which blocks me. My questions is how to modify a table keys in this scenario? I also check the exception stack, and found that the system infer the keys field by 
> val tableKeys: Option[Array[String]] = UpdatingPlanChecker.getUniqueKeyFields(optimizedPlan), I wonder how to make the function return value ?
> Thanks a lot !!!
>     var praise = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM praise GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
>     tableEnv.registerTable("praiseAggr", praise)
> 
>     var comment = tableEnv.sqlQuery(s"SELECT article_id,hll(from_uid) as CU FROM comment GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
>     tableEnv.registerTable("commentAggr", comment)
> 
>     var reader = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as RU FROM reader GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
>     tableEnv.registerTable("readerAggr", reader)
> 
>     var finalTable = tableEnv.sqlUpdate(s"insert into "+ sinkTableName + " " +  " SELECT p.article_id,p.PU,c.CU,r.RU from praiseAggr p FULL OUTER JOIN commentAggr c on p.article_id=c.article_id FULL OUTER JOIN readerAggr r on c.article_id=r.article_id")
> 
> 	
> 
> 	
> 
> Thank,
> Henry Xu
> 


Re: flink requires table key when insert into upsert table sink

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Henry,

The problem is that the table that results from the query does not have a
unique key.
You can only use an upsert sink if the table has a (composite) unique key.
Since this is not the case, you cannot use upsert sink.
However, you can implement a StreamRetractionTableSink which allows to
write any kind of Table (append-only/update, keyed/non-keyed) to an
external system.

Best, Fabian

2018-08-10 17:06 GMT+02:00 徐涛 <ha...@gmail.com>:

> Hi All,
> I am using flink 1.6 to generate some realtime programs. I want to write
> the output to table sink, the code is as below. At first I use append table
> sink, which error message tells me that I should use upsert table sink, so
> I write one. But still another error “Caused by: org.apache.flink.table.api.TableException:
> UpsertStreamTableSink requires that Table has a full primary keys if it is
> updated.” comes out,which blocks me. My questions is how to modify a table
> keys in this scenario? I also check the exception stack, and found that the
> system infer the keys field by
> val tableKeys: Option[Array[String]] = UpdatingPlanChecker.
> getUniqueKeyFields(optimizedPlan), I wonder how to make the function
> return value ?
> Thanks a lot !!!
>
>     var praise = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM praise GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
>     tableEnv.registerTable("praiseAggr", praise)
>
>     var comment = tableEnv.sqlQuery(s"SELECT article_id,hll(from_uid) as CU FROM comment GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
>     tableEnv.registerTable("commentAggr", comment)
>
>     var reader = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as RU FROM reader GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
>     tableEnv.registerTable("readerAggr", reader)
>
>     var finalTable = tableEnv.sqlUpdate(s"insert into "+ sinkTableName + " " +  " SELECT p.article_id,p.PU,c.CU,r.RU from praiseAggr p FULL OUTER JOIN commentAggr c on p.article_id=c.article_id FULL OUTER JOIN readerAggr r on c.article_id=r.article_id")
>
>
>
>
> Thank,
> Henry Xu
>