You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "godfrey he (Jira)" <ji...@apache.org> on 2020/02/16 13:38:00 UTC
[jira] [Comment Edited] (FLINK-16070) Blink planner can not extract
correct unique key for UpsertStreamTableSink
[ https://issues.apache.org/jira/browse/FLINK-16070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17037832#comment-17037832 ]
godfrey he edited comment on FLINK-16070 at 2/16/20 1:37 PM:
-------------------------------------------------------------
thanks for reporting this bug [~Leonard Xu]
hi [~danny0405], the constant value is already in the grouping. the plan is
{code:java}
Calc(select=[CAST(_UTF-16LE'ZL_001':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS aggId, pageId, ts, CAST(expoCnt) AS expoCnt, CAST(clkCnt) AS clkCnt])
+- GroupAggregate(groupBy=[aggId, pageId, ts], select=[aggId, pageId, ts, COUNT($f3) AS expoCnt, COUNT($f4) AS clkCnt])
+- Exchange(distribution=[hash[aggId, pageId, ts]])
+- Calc(select=[_UTF-16LE'ZL_001' AS aggId, pageId, ts2Date(recvTime) AS ts, CASE(=(eventId, _UTF-16LE'exposure':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 1, null:INTEGER) AS $f3, CASE(=(eventId, _UTF-16LE'click':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 1, null:INTEGER) AS $f4], where=[OR(=(eventId, _UTF-16LE'exposure'), =(eventId, _UTF-16LE'click'))])
+- TableSourceScan(table=[[default_catalog, default_database, csv, source: [CsvTableSource(read fields: pageId, eventId, recvTime)]]], fields=[pageId, eventId, recvTime])
{code}
I think there are two bugs:
one is in {{FlinkRelMdUniqueKeys}}, project/calc should handle constant field when deriving unique keys.
and another is the constant value should be removed from grouping key after {{AggregateProjectPullUpConstantsRule}} is applied.
I would like to fix these bugs.
was (Author: godfreyhe):
thanks for reporting this bug [~Leonard Xu]
hi [~danny0405], the constant value is already in the grouping. the plan is
{code:java}
Calc(select=[CAST(_UTF-16LE'ZL_001':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS aggId, pageId, ts, CAST(expoCnt) AS expoCnt, CAST(clkCnt) AS clkCnt])
+- GroupAggregate(groupBy=[aggId, pageId, ts], select=[aggId, pageId, ts, COUNT($f3) AS expoCnt, COUNT($f4) AS clkCnt])
+- Exchange(distribution=[hash[aggId, pageId, ts]])
+- Calc(select=[_UTF-16LE'ZL_001' AS aggId, pageId, ts2Date(recvTime) AS ts, CASE(=(eventId, _UTF-16LE'exposure':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 1, null:INTEGER) AS $f3, CASE(=(eventId, _UTF-16LE'click':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 1, null:INTEGER) AS $f4], where=[OR(=(eventId, _UTF-16LE'exposure'), =(eventId, _UTF-16LE'click'))])
+- TableSourceScan(table=[[default_catalog, default_database, csv, source: [CsvTableSource(read fields: pageId, eventId, recvTime)]]], fields=[pageId, eventId, recvTime])
{code}
I think there are two bugs:
one is in {{FlinkRelMdUniqueKeys}}, project/calc should handle constant field when deriving unique keys
and another is the constant value should be removed from grouping key after {{AggregateProjectPullUpConstantsRule}} is applied.
I would like to fix these bugs.
> Blink planner can not extract correct unique key for UpsertStreamTableSink
> ---------------------------------------------------------------------------
>
> Key: FLINK-16070
> URL: https://issues.apache.org/jira/browse/FLINK-16070
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.11.0
> Reporter: Leonard Xu
> Priority: Critical
> Fix For: 1.10.1
>
>
> I reproduce an Elasticsearch6UpsertTableSink issue which user reported in mail list[1] that Blink planner can not extract correct unique key for following query, but legacy planner works well.
> {code:java}
> // user code
> INSERT INTO ES6_ZHANGLE_OUTPUT
> SELECT aggId, pageId, ts_min as ts,
> count(case when eventId = 'exposure' then 1 else null end) as expoCnt,
> count(case when eventId = 'click' then 1 else null end) as clkCnt
> FROM (
> SELECT
> 'ZL_001' as aggId,
> pageId,
> eventId,
> recvTime,
> ts2Date(recvTime) as ts_min
> from kafka_zl_etrack_event_stream
> where eventId in ('exposure', 'click')
> ) as t1
> group by aggId, pageId, ts_min
> {code}
> I found that blink planner can not extract correct unique key in `*FlinkRelMetadataQuery.getUniqueKeys(relNode)*`, legacy planner works well in `*org.apache.flink.table.plan.util.UpdatingPlanChecker.getUniqueKeyFields(...)* `. A simple ETL job to reproduce this issue can refers[2]
>
> [1][http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-10-es-sink-exception-td32773.html]
> [2][https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2es/Kafka2UpsertEs.java]
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)