You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2020/11/26 08:37:00 UTC
[jira] [Created] (FLINK-20369) Improve the digest of
TableSourceScan and Sink node
Jark Wu created FLINK-20369:
-------------------------------
Summary: Improve the digest of TableSourceScan and Sink node
Key: FLINK-20369
URL: https://issues.apache.org/jira/browse/FLINK-20369
Project: Flink
Issue Type: Improvement
Components: Table SQL / Planner
Reporter: Jark Wu
Currently,
1. the digest of {{TableSourceScan}} and {{Sink}} doesn't contain the connector information which will be quite useful when debugging.
2. The table name is quite verbose when under default catalog and database, would be better to simplify it to only table name if under default catalog and database.
3. Maybe it's nicer to have changelog mode of source and sink, because it's a meta information of {{DynamicTableSource/Sink#getChangelogMode}}.
{code}
Sink(table=[default_catalog.default_database.sink_kafka_count_city], fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
+- Calc(select=[city_name, CAST(count_customer) AS count_customer, CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, count_customer, sum_gender, id, city_name], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
:- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
: +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS sum_gender], changelogMode=[I,UA,D])
: +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
: +- LocalGroupAggregate(groupBy=[city_id], select=[city_id, COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], changelogMode=[I])
: +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
: +- ChangelogNormalize(key=[customer_id], changelogMode=[I,UB,UA,D])
: +- Exchange(distribution=[hash[customer_id]], changelogMode=[UA,D])
: +- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], changelogMode=[UA,D])
: +- TableSourceScan(table=[[default_catalog, default_database, source_customer]], fields=[customer_id, city_id, age, gender, update_time], changelogMode=[UA,D])
+- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
+- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], changelogMode=[UA,D])
+- TableSourceScan(table=[[default_catalog, default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)