You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Jason_H <hy...@163.com> on 2022/11/11 03:10:03 UTC
flinksql join
hi,大家好
我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
1111 100 1 -> 未匹配
1111 100 1 -> 未匹配
1111 100 1 -> 匹配上
维表
账号 企业
2222 BBBB
1111 AAAA -> 后插入的账号信息
实际输出结果
企业 金额 笔数
AAAA 100 1
我想要的结果:
企业 金额 笔数
AAAA 300 3
sql如下:
String sql2 = "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
" ta.gmtStatistical as gmtStatistical,\n" +
" ta.paymentMethod as paymentMethod,\n" +
" tb.CORP_ID as outCorpId,\n" +
" tc.CORP_ID as inCorpId,\n" +
" sum(ta.tradeAmt) as tranAmount,\n" +
" sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account for system_time as of ta.proc as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
" DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" +
" gmtStatistical, \n" +
" paymentMethod, \n" +
" outCorpId, \n" +
" inCorpId, \n" +
" tranAmount, \n" +
" tranNum \n" +
"FROM temp";
| |
Jason_H
|
|
hyb_hello@163.com
|
Re: flinksql join
Posted by Jason_H <hy...@163.com>.
我尝试使用普通的join
String sql2 = "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
" ta.gmtStatistical as gmtStatistical,\n" +
" ta.paymentMethod as paymentMethod,\n" +
" tb.CORP_ID as outCorpId,\n" +
" tc.CORP_ID as inCorpId,\n" +
" sum(ta.tradeAmt) as tranAmount,\n" +
" sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
" DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" +
" gmtStatistical, \n" +
" paymentMethod, \n" +
" outCorpId, \n" +
" inCorpId, \n" +
" tranAmount, \n" +
" tranNum \n" +
"FROM temp";
这是我创建mysql表:
String accountDimSource = "CREATE TABLE dob_dim_account (\n" +
" DIM_ACCOUNT_ID string ,\n" +
" GMT_CREATE string ,\n" +
" ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" +
" GMT_UPDATE string ,\n" +
" ACCT_CODE string ,\n" +
" CUST_ID string ,\n" +
" CUST_NAME string ,\n" +
" CORP_ID string ,\n" +
" CORP_CERT_CODE string ,\n" +
" CORP_CERT_TYPE string ,\n" +
" CUST_MANAGER_JOB_CODE string ,\n" +
" TEAM_CODE string ,\n" +
" ORG_ID string, \n" +
" SUPER_ORG_ID string, \n" +
" IS_OUTSIDE BIGINT \n" +
") \n" +
"WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = '***',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
//" 'lookup.cache.ttl' = '1s', \n" +
" 'table-name' = 'dob_dim_account' \n" +
//" 'lookup.cache.max-rows' = '1000' \n" +
//" 'lookup.cache.ttl' = '1 minute',\n" +
//" 'lookup.max-retries' = '3' \n" +
" )";
但是此方式,好像不能解决我之前的问题,并且,再新加入账号时,新来的数据也无法关联出来
| |
Jason_H
|
|
hyb_hello@163.com
|
---- Replied Message ----
| From | Jason_H<hy...@163.com> |
| Date | 11/11/2022 14:42 |
| To | flink中文邮件组<us...@flink.apache.org> |
| Subject | Re: flinksql join |
我尝试使用普通的join
String sql2 = "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
" ta.gmtStatistical as gmtStatistical,\n" +
" ta.paymentMethod as paymentMethod,\n" +
" tb.CORP_ID as outCorpId,\n" +
" tc.CORP_ID as inCorpId,\n" +
" sum(ta.tradeAmt) as tranAmount,\n" +
" sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
" DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" +
" gmtStatistical, \n" +
" paymentMethod, \n" +
" outCorpId, \n" +
" inCorpId, \n" +
" tranAmount, \n" +
" tranNum \n" +
"FROM temp";
这是我创建mysql表:
String accountDimSource = "CREATE TABLE dob_dim_account (\n" +
" DIM_ACCOUNT_ID string ,\n" +
" GMT_CREATE string ,\n" +
" ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" +
" GMT_UPDATE string ,\n" +
" ACCT_CODE string ,\n" +
" CUST_ID string ,\n" +
" CUST_NAME string ,\n" +
" CORP_ID string ,\n" +
" CORP_CERT_CODE string ,\n" +
" CORP_CERT_TYPE string ,\n" +
" CUST_MANAGER_JOB_CODE string ,\n" +
" TEAM_CODE string ,\n" +
" ORG_ID string, \n" +
" SUPER_ORG_ID string, \n" +
" IS_OUTSIDE BIGINT \n" +
") \n" +
"WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = '***',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
//" 'lookup.cache.ttl' = '1s', \n" +
" 'table-name' = 'dob_dim_account' \n" +
//" 'lookup.cache.max-rows' = '1000' \n" +
//" 'lookup.cache.ttl' = '1 minute',\n" +
//" 'lookup.max-retries' = '3' \n" +
" )";
但是此方式,好像不能解决我之前的问题,并且,再新加入账号时,新来的数据也无法关联出来
| |
Jason_H
|
|
hyb_hello@163.com
|
---- Replied Message ----
| From | Zhiwen Sun<pe...@gmail.com> |
| Date | 11/11/2022 14:08 |
| To | <us...@flink.apache.org> |
| Subject | Re: flinksql join |
用普通的 join, 不要用 lookup join
Zhiwen Sun
On Fri, Nov 11, 2022 at 11:10 AM Jason_H <hy...@163.com> wrote:
hi,大家好
我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
1111 100 1 -> 未匹配
1111 100 1 -> 未匹配
1111 100 1 -> 匹配上
维表
账号 企业
2222 BBBB
1111 AAAA -> 后插入的账号信息
实际输出结果
企业 金额 笔数
AAAA 100 1
我想要的结果:
企业 金额 笔数
AAAA 300 3
sql如下:
String sql2 = "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
" ta.gmtStatistical as gmtStatistical,\n" +
" ta.paymentMethod as paymentMethod,\n" +
" tb.CORP_ID as outCorpId,\n" +
" tc.CORP_ID as inCorpId,\n" +
" sum(ta.tradeAmt) as tranAmount,\n" +
" sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as
tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account for system_time as of ta.proc as
tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
" DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as
gmtUpdate, \n" +
" gmtStatistical, \n" +
" paymentMethod, \n" +
" outCorpId, \n" +
" inCorpId, \n" +
" tranAmount, \n" +
" tranNum \n" +
"FROM temp";
| |
Jason_H
|
|
hyb_hello@163.com
|
Re: flinksql join
Posted by Jason_H <hy...@163.com>.
我尝试使用普通的join
String sql2 = "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
" ta.gmtStatistical as gmtStatistical,\n" +
" ta.paymentMethod as paymentMethod,\n" +
" tb.CORP_ID as outCorpId,\n" +
" tc.CORP_ID as inCorpId,\n" +
" sum(ta.tradeAmt) as tranAmount,\n" +
" sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
" DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" +
" gmtStatistical, \n" +
" paymentMethod, \n" +
" outCorpId, \n" +
" inCorpId, \n" +
" tranAmount, \n" +
" tranNum \n" +
"FROM temp";
这是我创建mysql表:
String accountDimSource = "CREATE TABLE dob_dim_account (\n" +
" DIM_ACCOUNT_ID string ,\n" +
" GMT_CREATE string ,\n" +
" ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" +
" GMT_UPDATE string ,\n" +
" ACCT_CODE string ,\n" +
" CUST_ID string ,\n" +
" CUST_NAME string ,\n" +
" CORP_ID string ,\n" +
" CORP_CERT_CODE string ,\n" +
" CORP_CERT_TYPE string ,\n" +
" CUST_MANAGER_JOB_CODE string ,\n" +
" TEAM_CODE string ,\n" +
" ORG_ID string, \n" +
" SUPER_ORG_ID string, \n" +
" IS_OUTSIDE BIGINT \n" +
") \n" +
"WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = '***',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
//" 'lookup.cache.ttl' = '1s', \n" +
" 'table-name' = 'dob_dim_account' \n" +
//" 'lookup.cache.max-rows' = '1000' \n" +
//" 'lookup.cache.ttl' = '1 minute',\n" +
//" 'lookup.max-retries' = '3' \n" +
" )";
但是此方式,好像不能解决我之前的问题,并且,再新加入账号时,新来的数据也无法关联出来
| |
Jason_H
|
|
hyb_hello@163.com
|
---- Replied Message ----
| From | Zhiwen Sun<pe...@gmail.com> |
| Date | 11/11/2022 14:08 |
| To | <us...@flink.apache.org> |
| Subject | Re: flinksql join |
用普通的 join, 不要用 lookup join
Zhiwen Sun
On Fri, Nov 11, 2022 at 11:10 AM Jason_H <hy...@163.com> wrote:
hi,大家好
我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
1111 100 1 -> 未匹配
1111 100 1 -> 未匹配
1111 100 1 -> 匹配上
维表
账号 企业
2222 BBBB
1111 AAAA -> 后插入的账号信息
实际输出结果
企业 金额 笔数
AAAA 100 1
我想要的结果:
企业 金额 笔数
AAAA 300 3
sql如下:
String sql2 = "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
" ta.gmtStatistical as gmtStatistical,\n" +
" ta.paymentMethod as paymentMethod,\n" +
" tb.CORP_ID as outCorpId,\n" +
" tc.CORP_ID as inCorpId,\n" +
" sum(ta.tradeAmt) as tranAmount,\n" +
" sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as
tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account for system_time as of ta.proc as
tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
" DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as
gmtUpdate, \n" +
" gmtStatistical, \n" +
" paymentMethod, \n" +
" outCorpId, \n" +
" inCorpId, \n" +
" tranAmount, \n" +
" tranNum \n" +
"FROM temp";
| |
Jason_H
|
|
hyb_hello@163.com
|
Re: flinksql join
Posted by Jason_H <hy...@163.com>.
我尝试使用普通的join
String sql2 = "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
" ta.gmtStatistical as gmtStatistical,\n" +
" ta.paymentMethod as paymentMethod,\n" +
" tb.CORP_ID as outCorpId,\n" +
" tc.CORP_ID as inCorpId,\n" +
" sum(ta.tradeAmt) as tranAmount,\n" +
" sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
" DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" +
" gmtStatistical, \n" +
" paymentMethod, \n" +
" outCorpId, \n" +
" inCorpId, \n" +
" tranAmount, \n" +
" tranNum \n" +
"FROM temp";
这是我创建mysql表:
String accountDimSource = "CREATE TABLE dob_dim_account (\n" +
" DIM_ACCOUNT_ID string ,\n" +
" GMT_CREATE string ,\n" +
" ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" +
" GMT_UPDATE string ,\n" +
" ACCT_CODE string ,\n" +
" CUST_ID string ,\n" +
" CUST_NAME string ,\n" +
" CORP_ID string ,\n" +
" CORP_CERT_CODE string ,\n" +
" CORP_CERT_TYPE string ,\n" +
" CUST_MANAGER_JOB_CODE string ,\n" +
" TEAM_CODE string ,\n" +
" ORG_ID string, \n" +
" SUPER_ORG_ID string, \n" +
" IS_OUTSIDE BIGINT \n" +
") \n" +
"WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = '***',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
//" 'lookup.cache.ttl' = '1s', \n" +
" 'table-name' = 'dob_dim_account' \n" +
//" 'lookup.cache.max-rows' = '1000' \n" +
//" 'lookup.cache.ttl' = '1 minute',\n" +
//" 'lookup.max-retries' = '3' \n" +
" )";
但是此方式,好像不能解决我之前的问题,并且,再新加入账号时,新来的数据也无法关联出来
| |
Jason_H
|
|
hyb_hello@163.com
|
---- Replied Message ----
| From | Zhiwen Sun<pe...@gmail.com> |
| Date | 11/11/2022 14:08 |
| To | <us...@flink.apache.org> |
| Subject | Re: flinksql join |
用普通的 join, 不要用 lookup join
Zhiwen Sun
On Fri, Nov 11, 2022 at 11:10 AM Jason_H <hy...@163.com> wrote:
hi,大家好
我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
1111 100 1 -> 未匹配
1111 100 1 -> 未匹配
1111 100 1 -> 匹配上
维表
账号 企业
2222 BBBB
1111 AAAA -> 后插入的账号信息
实际输出结果
企业 金额 笔数
AAAA 100 1
我想要的结果:
企业 金额 笔数
AAAA 300 3
sql如下:
String sql2 = "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
" ta.gmtStatistical as gmtStatistical,\n" +
" ta.paymentMethod as paymentMethod,\n" +
" tb.CORP_ID as outCorpId,\n" +
" tc.CORP_ID as inCorpId,\n" +
" sum(ta.tradeAmt) as tranAmount,\n" +
" sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as
tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account for system_time as of ta.proc as
tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
" DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as
gmtUpdate, \n" +
" gmtStatistical, \n" +
" paymentMethod, \n" +
" outCorpId, \n" +
" inCorpId, \n" +
" tranAmount, \n" +
" tranNum \n" +
"FROM temp";
| |
Jason_H
|
|
hyb_hello@163.com
|
Re: flinksql join
Posted by Zhiwen Sun <pe...@gmail.com>.
用普通的 join, 不要用 lookup join
Zhiwen Sun
On Fri, Nov 11, 2022 at 11:10 AM Jason_H <hy...@163.com> wrote:
>
>
> hi,大家好
>
> 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
> kakfa输入:
> 账号 金额 笔数
> 1111 100 1 -> 未匹配
> 1111 100 1 -> 未匹配
> 1111 100 1 -> 匹配上
>
> 维表
> 账号 企业
> 2222 BBBB
> 1111 AAAA -> 后插入的账号信息
> 实际输出结果
> 企业 金额 笔数
> AAAA 100 1
>
>
> 我想要的结果:
> 企业 金额 笔数
> AAAA 300 3
>
>
>
>
>
> sql如下:
> String sql2 = "insert into dws_b2b_trade_year_index\n" +
> "WITH temp AS (\n" +
> "select \n" +
> " ta.gmtStatistical as gmtStatistical,\n" +
> " ta.paymentMethod as paymentMethod,\n" +
> " tb.CORP_ID as outCorpId,\n" +
> " tc.CORP_ID as inCorpId,\n" +
> " sum(ta.tradeAmt) as tranAmount,\n" +
> " sum(ta.tradeCnt) as tranNum \n" +
> "from dws_a2a_trade_year_index ta \n" +
> "left join dob_dim_account for system_time as of ta.proc as
> tb on ta.outAcctCode = tb.ACCT_CODE \n" +
> "left join dob_dim_account for system_time as of ta.proc as
> tc on ta.inAcctCode = tc.ACCT_CODE \n" +
> "group by \n" +
> " ta.gmtStatistical, \n" +
> " ta.paymentMethod, \n" +
> " tb.CORP_ID, \n" +
> " tc.CORP_ID \n" +
> ") \n" +
> "SELECT \n" +
> " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as
> gmtUpdate, \n" +
> " gmtStatistical, \n" +
> " paymentMethod, \n" +
> " outCorpId, \n" +
> " inCorpId, \n" +
> " tranAmount, \n" +
> " tranNum \n" +
> "FROM temp";
>
> | |
> Jason_H
> |
> |
> hyb_hello@163.com
> |
Re: flinksql join
Posted by Zhiwen Sun <pe...@gmail.com>.
dob_dim_account 维表如果使用 jdbc 的 connector, flink 会在初始化的时候一次性读取所有的数据,
后续数据库中更新并不会触发 flink 计算。
要解决这个问题, dob_dim_account 需要变成流表。
Zhiwen Sun
On Thu, Nov 17, 2022 at 1:56 PM Jason_H <hy...@163.com> wrote:
> hi,你好
> 这种方式,需要使用cdc,但是我们的现在方案里领导不考虑使用cdc,只想用flinksql去解决这个问题
>
>
> | |
> Jason_H
> |
> |
> hyb_hello@163.com
> |
> ---- Replied Message ----
> | From | 任召金<re...@100.me> |
> | Date | 11/15/2022 09:52 |
> | To | user-zh<us...@flink.apache.org> |
> | Subject | Re: flinksql join |
> hello,你可以试下,将mysql的数据通过CDC变成流数据,然后跟主流inner join,注意状态的TTL
>
>
> ------------------ Original ------------------
> From: "Jason_H"<hyb_hello@163.com>;
> Date: Tue, Nov 15, 2022 09:46 AM
> To: "flink中文邮件组"<user-zh@flink.apache.org>;
>
> Subject: Re: flinksql join
>
>
>
> hi,你好
> 我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。
>
>
> | |
> Jason_H
> |
> |
> hyb_hello@163.com
> |
> ---- Replied Message ----
> | From | RS<tinyshrimp@163.com> |
> | Date | 11/15/2022 09:07 |
> | To | user-zh@flink.apache.org<user-zh@flink.apache.org> |
> | Subject | Re:flinksql join |
> Hi,
> 我的理解是后插入的维表数据,关联不到是正常现象,
> 如果要实现AAAA=3的话,应该要手动重新跑历史数据,然后更新现有数据,
>
>
> Thanks
>
>
>
>
>
>
> 在 2022-11-11 11:10:03,"Jason_H" <hyb_hello@163.com> 写道:
>
>
> hi,大家好
>
> 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
> kakfa输入:
> 账号 金额 笔数
> 1111 100 1 -> 未匹配
> 1111 100 1 -> 未匹配
> 1111 100 1 -> 匹配上
>
> 维表
> 账号 企业
> 2222 BBBB
> 1111 AAAA -> 后插入的账号信息
> 实际输出结果
> 企业 金额 笔数
> AAAA 100 1
>
>
> 我想要的结果:
> 企业 金额 笔数
> AAAA 300 3
>
>
>
>
>
> sql如下:
> String sql2 = "insert into dws_b2b_trade_year_index\n" +
> "WITH temp AS (\n" +
> "select \n" +
> " ta.gmtStatistical as gmtStatistical,\n" +
> " ta.paymentMethod as paymentMethod,\n" +
> " tb.CORP_ID as outCorpId,\n" +
> " tc.CORP_ID as inCorpId,\n" +
> " sum(ta.tradeAmt) as tranAmount,\n" +
> " sum(ta.tradeCnt) as tranNum \n" +
> "from dws_a2a_trade_year_index ta \n" +
> "left join dob_dim_account for system_time as of ta.proc as tb on
> ta.outAcctCode = tb.ACCT_CODE \n" +
> "left join dob_dim_account for system_time as of ta.proc as tc on
> ta.inAcctCode = tc.ACCT_CODE \n" +
> "group by \n" +
> " ta.gmtStatistical, \n" +
> " ta.paymentMethod, \n" +
> " tb.CORP_ID, \n" +
> " tc.CORP_ID \n" +
> ") \n" +
> "SELECT \n" +
> " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as
> gmtUpdate, \n" +
> " gmtStatistical, \n" +
> " paymentMethod, \n" +
> " outCorpId, \n" +
> " inCorpId, \n" +
> " tranAmount, \n" +
> " tranNum \n" +
> "FROM temp";
>
> | |
> Jason_H
> |
> |
> hyb_hello@163.com
> |
Re: flinksql join
Posted by Jason_H <hy...@163.com>.
hi,你好
这种方式,需要使用cdc,但是我们的现在方案里领导不考虑使用cdc,只想用flinksql去解决这个问题
| |
Jason_H
|
|
hyb_hello@163.com
|
---- Replied Message ----
| From | 任召金<re...@100.me> |
| Date | 11/15/2022 09:52 |
| To | user-zh<us...@flink.apache.org> |
| Subject | Re: flinksql join |
hello,你可以试下,将mysql的数据通过CDC变成流数据,然后跟主流inner join,注意状态的TTL
------------------ Original ------------------
From: "Jason_H"<hyb_hello@163.com>;
Date: Tue, Nov 15, 2022 09:46 AM
To: "flink中文邮件组"<user-zh@flink.apache.org>;
Subject: Re: flinksql join
hi,你好
我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。
| |
Jason_H
|
|
hyb_hello@163.com
|
---- Replied Message ----
| From | RS<tinyshrimp@163.com> |
| Date | 11/15/2022 09:07 |
| To | user-zh@flink.apache.org<user-zh@flink.apache.org> |
| Subject | Re:flinksql join |
Hi,
我的理解是后插入的维表数据,关联不到是正常现象,
如果要实现AAAA=3的话,应该要手动重新跑历史数据,然后更新现有数据,
Thanks
在 2022-11-11 11:10:03,"Jason_H" <hyb_hello@163.com> 写道:
hi,大家好
我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
1111 100 1 -> 未匹配
1111 100 1 -> 未匹配
1111 100 1 -> 匹配上
维表
账号 企业
2222 BBBB
1111 AAAA -> 后插入的账号信息
实际输出结果
企业 金额 笔数
AAAA 100 1
我想要的结果:
企业 金额 笔数
AAAA 300 3
sql如下:
String sql2 = "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
" ta.gmtStatistical as gmtStatistical,\n" +
" ta.paymentMethod as paymentMethod,\n" +
" tb.CORP_ID as outCorpId,\n" +
" tc.CORP_ID as inCorpId,\n" +
" sum(ta.tradeAmt) as tranAmount,\n" +
" sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account for system_time as of ta.proc as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
" DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" +
" gmtStatistical, \n" +
" paymentMethod, \n" +
" outCorpId, \n" +
" inCorpId, \n" +
" tranAmount, \n" +
" tranNum \n" +
"FROM temp";
| |
Jason_H
|
|
hyb_hello@163.com
|
Re: flinksql join
Posted by 任召金 <re...@100.me>.
hello,你可以试下,将mysql的数据通过CDC变成流数据,然后跟主流inner join,注意状态的TTL
------------------ Original ------------------
From: "Jason_H"<hyb_hello@163.com>;
Date: Tue, Nov 15, 2022 09:46 AM
To: "flink中文邮件组"<user-zh@flink.apache.org>;
Subject: Re: flinksql join
hi,你好
我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。
| |
Jason_H
|
|
hyb_hello@163.com
|
---- Replied Message ----
| From | RS<tinyshrimp@163.com> |
| Date | 11/15/2022 09:07 |
| To | user-zh@flink.apache.org<user-zh@flink.apache.org> |
| Subject | Re:flinksql join |
Hi,
我的理解是后插入的维表数据,关联不到是正常现象,
如果要实现AAAA=3的话,应该要手动重新跑历史数据,然后更新现有数据,
Thanks
在 2022-11-11 11:10:03,"Jason_H" <hyb_hello@163.com> 写道:
hi,大家好
我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
1111 100 1 -> 未匹配
1111 100 1 -> 未匹配
1111 100 1 -> 匹配上
维表
账号 企业
2222 BBBB
1111 AAAA -> 后插入的账号信息
实际输出结果
企业 金额 笔数
AAAA 100 1
我想要的结果:
企业 金额 笔数
AAAA 300 3
sql如下:
String sql2 = "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
" ta.gmtStatistical as gmtStatistical,\n" +
" ta.paymentMethod as paymentMethod,\n" +
" tb.CORP_ID as outCorpId,\n" +
" tc.CORP_ID as inCorpId,\n" +
" sum(ta.tradeAmt) as tranAmount,\n" +
" sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account for system_time as of ta.proc as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
" DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" +
" gmtStatistical, \n" +
" paymentMethod, \n" +
" outCorpId, \n" +
" inCorpId, \n" +
" tranAmount, \n" +
" tranNum \n" +
"FROM temp";
| |
Jason_H
|
|
hyb_hello@163.com
|
Re: flinksql join
Posted by Jason_H <hy...@163.com>.
hi,你好
我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。
| |
Jason_H
|
|
hyb_hello@163.com
|
---- Replied Message ----
| From | RS<ti...@163.com> |
| Date | 11/15/2022 09:07 |
| To | user-zh@flink.apache.org<us...@flink.apache.org> |
| Subject | Re:flinksql join |
Hi,
我的理解是后插入的维表数据,关联不到是正常现象,
如果要实现AAAA=3的话,应该要手动重新跑历史数据,然后更新现有数据,
Thanks
在 2022-11-11 11:10:03,"Jason_H" <hy...@163.com> 写道:
hi,大家好
我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
1111 100 1 -> 未匹配
1111 100 1 -> 未匹配
1111 100 1 -> 匹配上
维表
账号 企业
2222 BBBB
1111 AAAA -> 后插入的账号信息
实际输出结果
企业 金额 笔数
AAAA 100 1
我想要的结果:
企业 金额 笔数
AAAA 300 3
sql如下:
String sql2 = "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
" ta.gmtStatistical as gmtStatistical,\n" +
" ta.paymentMethod as paymentMethod,\n" +
" tb.CORP_ID as outCorpId,\n" +
" tc.CORP_ID as inCorpId,\n" +
" sum(ta.tradeAmt) as tranAmount,\n" +
" sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account for system_time as of ta.proc as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
" DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" +
" gmtStatistical, \n" +
" paymentMethod, \n" +
" outCorpId, \n" +
" inCorpId, \n" +
" tranAmount, \n" +
" tranNum \n" +
"FROM temp";
| |
Jason_H
|
|
hyb_hello@163.com
|
Re: flinksql join
Posted by Jason_H <hy...@163.com>.
hi,你好
我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。
| |
Jason
|
|
hyb_hello@163.com
|
---- Replied Message ----
| From | RS<ti...@163.com> |
| Date | 11/15/2022 09:07 |
| To | user-zh@flink.apache.org<us...@flink.apache.org> |
| Subject | Re:flinksql join |
Hi,
我的理解是后插入的维表数据,关联不到是正常现象,
如果要实现AAAA=3的话,应该要手动重新跑历史数据,然后更新现有数据,
Thanks
在 2022-11-11 11:10:03,"Jason_H" <hy...@163.com> 写道:
hi,大家好
我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
1111 100 1 -> 未匹配
1111 100 1 -> 未匹配
1111 100 1 -> 匹配上
维表
账号 企业
2222 BBBB
1111 AAAA -> 后插入的账号信息
实际输出结果
企业 金额 笔数
AAAA 100 1
我想要的结果:
企业 金额 笔数
AAAA 300 3
sql如下:
String sql2 = "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
" ta.gmtStatistical as gmtStatistical,\n" +
" ta.paymentMethod as paymentMethod,\n" +
" tb.CORP_ID as outCorpId,\n" +
" tc.CORP_ID as inCorpId,\n" +
" sum(ta.tradeAmt) as tranAmount,\n" +
" sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account for system_time as of ta.proc as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
" DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" +
" gmtStatistical, \n" +
" paymentMethod, \n" +
" outCorpId, \n" +
" inCorpId, \n" +
" tranAmount, \n" +
" tranNum \n" +
"FROM temp";
| |
Jason_H
|
|
hyb_hello@163.com
|
Re:flinksql join
Posted by RS <ti...@163.com>.
Hi,
我的理解是后插入的维表数据,关联不到是正常现象,
如果要实现AAAA=3的话,应该要手动重新跑历史数据,然后更新现有数据,
Thanks
在 2022-11-11 11:10:03,"Jason_H" <hy...@163.com> 写道:
>
>
>hi,大家好
>我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
>kakfa输入:
>账号 金额 笔数
>1111 100 1 -> 未匹配
>1111 100 1 -> 未匹配
>1111 100 1 -> 匹配上
>
>维表
>账号 企业
>2222 BBBB
>1111 AAAA -> 后插入的账号信息
>实际输出结果
>企业 金额 笔数
>AAAA 100 1
>
>
>我想要的结果:
>企业 金额 笔数
>AAAA 300 3
>
>
>
>
>
>sql如下:
>String sql2 = "insert into dws_b2b_trade_year_index\n" +
> "WITH temp AS (\n" +
> "select \n" +
> " ta.gmtStatistical as gmtStatistical,\n" +
> " ta.paymentMethod as paymentMethod,\n" +
> " tb.CORP_ID as outCorpId,\n" +
> " tc.CORP_ID as inCorpId,\n" +
> " sum(ta.tradeAmt) as tranAmount,\n" +
> " sum(ta.tradeCnt) as tranNum \n" +
> "from dws_a2a_trade_year_index ta \n" +
> "left join dob_dim_account for system_time as of ta.proc as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
> "left join dob_dim_account for system_time as of ta.proc as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
> "group by \n" +
> " ta.gmtStatistical, \n" +
> " ta.paymentMethod, \n" +
> " tb.CORP_ID, \n" +
> " tc.CORP_ID \n" +
> ") \n" +
> "SELECT \n" +
> " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" +
> " gmtStatistical, \n" +
> " paymentMethod, \n" +
> " outCorpId, \n" +
> " inCorpId, \n" +
> " tranAmount, \n" +
> " tranNum \n" +
> "FROM temp";
>
>| |
>Jason_H
>|
>|
>hyb_hello@163.com
>|