You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Jason_H <hy...@163.com> on 2022/11/11 03:10:03 UTC

flinksql join


hi,大家好
我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
1111 100 1  -> 未匹配
1111 100 1  -> 未匹配
1111 100 1  -> 匹配上

维表 
账号  企业
2222  BBBB
1111  AAAA   -> 后插入的账号信息
实际输出结果
企业  金额  笔数
AAAA 100   1


我想要的结果:
企业  金额  笔数
AAAA 300   3





sql如下:
String sql2 =  "insert into dws_b2b_trade_year_index\n" +
               "WITH temp AS (\n" +
               "select \n" +
               "  ta.gmtStatistical as gmtStatistical,\n" +
               "  ta.paymentMethod as paymentMethod,\n" +
               "  tb.CORP_ID as outCorpId,\n" +
               "  tc.CORP_ID as inCorpId,\n" +
               "  sum(ta.tradeAmt) as tranAmount,\n" +
               "  sum(ta.tradeCnt) as tranNum \n" +
               "from dws_a2a_trade_year_index ta \n" +
               "left join dob_dim_account for system_time as of ta.proc as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
               "left join dob_dim_account for system_time as of ta.proc as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
               "group by \n" +
               " ta.gmtStatistical, \n" +
               " ta.paymentMethod, \n" +
               " tb.CORP_ID, \n" +
               " tc.CORP_ID \n" +
               ") \n" +
               "SELECT \n" +
               "   DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" +
               "   gmtStatistical, \n" +
               "   paymentMethod, \n" +
               "   outCorpId, \n" +
               "   inCorpId, \n" +
               "   tranAmount, \n" +
               "   tranNum \n" +
               "FROM temp";

| |
Jason_H
|
|
hyb_hello@163.com
|

Re: flinksql join

Posted by Jason_H <hy...@163.com>.
我尝试使用普通的join
String sql2 = "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"   DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" +
"   gmtStatistical, \n" +
"   paymentMethod, \n" +
"   outCorpId, \n" +
"   inCorpId, \n" +
"   tranAmount, \n" +
"   tranNum \n" +
"FROM temp";
这是我创建mysql表:
String accountDimSource = "CREATE TABLE dob_dim_account (\n" +
"    DIM_ACCOUNT_ID  string ,\n" +
"    GMT_CREATE  string ,\n" +
"    ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" +
"    GMT_UPDATE  string ,\n" +
"    ACCT_CODE  string ,\n" +
"    CUST_ID  string ,\n" +
"    CUST_NAME  string ,\n" +
"    CORP_ID  string ,\n" +
"    CORP_CERT_CODE  string ,\n" +
"    CORP_CERT_TYPE  string ,\n" +
"    CUST_MANAGER_JOB_CODE  string ,\n" +
"    TEAM_CODE  string ,\n" +
"    ORG_ID  string, \n" +
"    SUPER_ORG_ID  string, \n" +
"    IS_OUTSIDE  BIGINT \n" +
") \n" +
"WITH (\n" +
"  'connector' = 'jdbc',\n" +
"  'url' = '***',\n" +
"  'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
"  'username' = 'root',\n" +
"  'password' = '123456',\n" +
//"  'lookup.cache.ttl' = '1s', \n" +
"  'table-name' = 'dob_dim_account' \n" +
//"  'lookup.cache.max-rows' = '1000' \n" +
//"  'lookup.cache.ttl' = '1 minute',\n" +
//"  'lookup.max-retries' = '3' \n" +
" )";


但是此方式,好像不能解决我之前的问题,并且,再新加入账号时,新来的数据也无法关联出来



| |
Jason_H
|
|
hyb_hello@163.com
|
---- Replied Message ----
| From | Jason_H<hy...@163.com> |
| Date | 11/11/2022 14:42 |
| To | flink中文邮件组<us...@flink.apache.org> |
| Subject | Re: flinksql join |
我尝试使用普通的join
String sql2 = "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"   DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" +
"   gmtStatistical, \n" +
"   paymentMethod, \n" +
"   outCorpId, \n" +
"   inCorpId, \n" +
"   tranAmount, \n" +
"   tranNum \n" +
"FROM temp";
这是我创建mysql表:
String accountDimSource = "CREATE TABLE dob_dim_account (\n" +
"    DIM_ACCOUNT_ID  string ,\n" +
"    GMT_CREATE  string ,\n" +
"    ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" +
"    GMT_UPDATE  string ,\n" +
"    ACCT_CODE  string ,\n" +
"    CUST_ID  string ,\n" +
"    CUST_NAME  string ,\n" +
"    CORP_ID  string ,\n" +
"    CORP_CERT_CODE  string ,\n" +
"    CORP_CERT_TYPE  string ,\n" +
"    CUST_MANAGER_JOB_CODE  string ,\n" +
"    TEAM_CODE  string ,\n" +
"    ORG_ID  string, \n" +
"    SUPER_ORG_ID  string, \n" +
"    IS_OUTSIDE  BIGINT \n" +
") \n" +
"WITH (\n" +
"  'connector' = 'jdbc',\n" +
"  'url' = '***',\n" +
"  'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
"  'username' = 'root',\n" +
"  'password' = '123456',\n" +
//"  'lookup.cache.ttl' = '1s', \n" +
"  'table-name' = 'dob_dim_account' \n" +
//"  'lookup.cache.max-rows' = '1000' \n" +
//"  'lookup.cache.ttl' = '1 minute',\n" +
//"  'lookup.max-retries' = '3' \n" +
" )";


但是此方式,好像不能解决我之前的问题,并且,再新加入账号时,新来的数据也无法关联出来



| |
Jason_H
|
|
hyb_hello@163.com
|
---- Replied Message ----
| From | Zhiwen Sun<pe...@gmail.com> |
| Date | 11/11/2022 14:08 |
| To | <us...@flink.apache.org> |
| Subject | Re: flinksql join |
用普通的 join, 不要用 lookup join

Zhiwen Sun



On Fri, Nov 11, 2022 at 11:10 AM Jason_H <hy...@163.com> wrote:



hi,大家好

我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
1111 100 1  -> 未匹配
1111 100 1  -> 未匹配
1111 100 1  -> 匹配上

维表
账号  企业
2222  BBBB
1111  AAAA   -> 后插入的账号信息
实际输出结果
企业  金额  笔数
AAAA 100   1


我想要的结果:
企业  金额  笔数
AAAA 300   3





sql如下:
String sql2 =  "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as
tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account for system_time as of ta.proc as
tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"   DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as
gmtUpdate, \n" +
"   gmtStatistical, \n" +
"   paymentMethod, \n" +
"   outCorpId, \n" +
"   inCorpId, \n" +
"   tranAmount, \n" +
"   tranNum \n" +
"FROM temp";

| |
Jason_H
|
|
hyb_hello@163.com
|

Re: flinksql join

Posted by Jason_H <hy...@163.com>.
我尝试使用普通的join
String sql2 = "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"   DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" +
"   gmtStatistical, \n" +
"   paymentMethod, \n" +
"   outCorpId, \n" +
"   inCorpId, \n" +
"   tranAmount, \n" +
"   tranNum \n" +
"FROM temp";
这是我创建mysql表:
String accountDimSource = "CREATE TABLE dob_dim_account (\n" +
"    DIM_ACCOUNT_ID  string ,\n" +
"    GMT_CREATE  string ,\n" +
"    ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" +
"    GMT_UPDATE  string ,\n" +
"    ACCT_CODE  string ,\n" +
"    CUST_ID  string ,\n" +
"    CUST_NAME  string ,\n" +
"    CORP_ID  string ,\n" +
"    CORP_CERT_CODE  string ,\n" +
"    CORP_CERT_TYPE  string ,\n" +
"    CUST_MANAGER_JOB_CODE  string ,\n" +
"    TEAM_CODE  string ,\n" +
"    ORG_ID  string, \n" +
"    SUPER_ORG_ID  string, \n" +
"    IS_OUTSIDE  BIGINT \n" +
") \n" +
"WITH (\n" +
"  'connector' = 'jdbc',\n" +
"  'url' = '***',\n" +
"  'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
"  'username' = 'root',\n" +
"  'password' = '123456',\n" +
//"  'lookup.cache.ttl' = '1s', \n" +
"  'table-name' = 'dob_dim_account' \n" +
//"  'lookup.cache.max-rows' = '1000' \n" +
//"  'lookup.cache.ttl' = '1 minute',\n" +
//"  'lookup.max-retries' = '3' \n" +
" )";


但是此方式,好像不能解决我之前的问题,并且,再新加入账号时,新来的数据也无法关联出来



| |
Jason_H
|
|
hyb_hello@163.com
|
---- Replied Message ----
| From | Zhiwen Sun<pe...@gmail.com> |
| Date | 11/11/2022 14:08 |
| To | <us...@flink.apache.org> |
| Subject | Re: flinksql join |
用普通的 join, 不要用 lookup join

Zhiwen Sun



On Fri, Nov 11, 2022 at 11:10 AM Jason_H <hy...@163.com> wrote:



hi,大家好

我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
1111 100 1  -> 未匹配
1111 100 1  -> 未匹配
1111 100 1  -> 匹配上

维表
账号  企业
2222  BBBB
1111  AAAA   -> 后插入的账号信息
实际输出结果
企业  金额  笔数
AAAA 100   1


我想要的结果:
企业  金额  笔数
AAAA 300   3





sql如下:
String sql2 =  "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as
tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account for system_time as of ta.proc as
tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"   DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as
gmtUpdate, \n" +
"   gmtStatistical, \n" +
"   paymentMethod, \n" +
"   outCorpId, \n" +
"   inCorpId, \n" +
"   tranAmount, \n" +
"   tranNum \n" +
"FROM temp";

| |
Jason_H
|
|
hyb_hello@163.com
|

Re: flinksql join

Posted by Jason_H <hy...@163.com>.
我尝试使用普通的join
String sql2 = "insert into dws_b2b_trade_year_index\n" +
               "WITH temp AS (\n" +
               "select \n" +
               "  ta.gmtStatistical as gmtStatistical,\n" +
               "  ta.paymentMethod as paymentMethod,\n" +
               "  tb.CORP_ID as outCorpId,\n" +
               "  tc.CORP_ID as inCorpId,\n" +
               "  sum(ta.tradeAmt) as tranAmount,\n" +
               "  sum(ta.tradeCnt) as tranNum \n" +
               "from dws_a2a_trade_year_index ta \n" +
               "left join dob_dim_account as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
               "left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
               "group by \n" +
               " ta.gmtStatistical, \n" +
               " ta.paymentMethod, \n" +
               " tb.CORP_ID, \n" +
               " tc.CORP_ID \n" +
               ") \n" +
               "SELECT \n" +
               "   DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" +
               "   gmtStatistical, \n" +
               "   paymentMethod, \n" +
               "   outCorpId, \n" +
               "   inCorpId, \n" +
               "   tranAmount, \n" +
               "   tranNum \n" +
               "FROM temp";
这是我创建mysql表:
String accountDimSource = "CREATE TABLE dob_dim_account (\n" +
               "    DIM_ACCOUNT_ID  string ,\n" +
               "    GMT_CREATE  string ,\n" +
               "    ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" +
               "    GMT_UPDATE  string ,\n" +
               "    ACCT_CODE  string ,\n" +
               "    CUST_ID  string ,\n" +
               "    CUST_NAME  string ,\n" +
               "    CORP_ID  string ,\n" +
               "    CORP_CERT_CODE  string ,\n" +
               "    CORP_CERT_TYPE  string ,\n" +
               "    CUST_MANAGER_JOB_CODE  string ,\n" +
               "    TEAM_CODE  string ,\n" +
               "    ORG_ID  string, \n" +
               "    SUPER_ORG_ID  string, \n" +
               "    IS_OUTSIDE  BIGINT \n" +
               ") \n" +
               "WITH (\n" +
               "  'connector' = 'jdbc',\n" +
               "  'url' = '***',\n" +
               "  'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
               "  'username' = 'root',\n" +
               "  'password' = '123456',\n" +
               //"  'lookup.cache.ttl' = '1s', \n" +
               "  'table-name' = 'dob_dim_account' \n" +
               //"  'lookup.cache.max-rows' = '1000' \n" +
               //"  'lookup.cache.ttl' = '1 minute',\n" +
               //"  'lookup.max-retries' = '3' \n" +
               " )";


但是此方式,好像不能解决我之前的问题,并且,再新加入账号时,新来的数据也无法关联出来




| |
Jason_H
|
|
hyb_hello@163.com
|
---- Replied Message ----
| From | Zhiwen Sun<pe...@gmail.com> |
| Date | 11/11/2022 14:08 |
| To | <us...@flink.apache.org> |
| Subject | Re: flinksql join |
用普通的 join, 不要用 lookup join

Zhiwen Sun



On Fri, Nov 11, 2022 at 11:10 AM Jason_H <hy...@163.com> wrote:



hi,大家好

我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
1111 100 1  -> 未匹配
1111 100 1  -> 未匹配
1111 100 1  -> 匹配上

维表
账号  企业
2222  BBBB
1111  AAAA   -> 后插入的账号信息
实际输出结果
企业  金额  笔数
AAAA 100   1


我想要的结果:
企业  金额  笔数
AAAA 300   3





sql如下:
String sql2 =  "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as
tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account for system_time as of ta.proc as
tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"   DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as
gmtUpdate, \n" +
"   gmtStatistical, \n" +
"   paymentMethod, \n" +
"   outCorpId, \n" +
"   inCorpId, \n" +
"   tranAmount, \n" +
"   tranNum \n" +
"FROM temp";

| |
Jason_H
|
|
hyb_hello@163.com
|

Re: flinksql join

Posted by Zhiwen Sun <pe...@gmail.com>.
用普通的 join, 不要用 lookup join

Zhiwen Sun



On Fri, Nov 11, 2022 at 11:10 AM Jason_H <hy...@163.com> wrote:

>
>
> hi,大家好
>
> 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
> kakfa输入:
> 账号 金额 笔数
> 1111 100 1  -> 未匹配
> 1111 100 1  -> 未匹配
> 1111 100 1  -> 匹配上
>
> 维表
> 账号  企业
> 2222  BBBB
> 1111  AAAA   -> 后插入的账号信息
> 实际输出结果
> 企业  金额  笔数
> AAAA 100   1
>
>
> 我想要的结果:
> 企业  金额  笔数
> AAAA 300   3
>
>
>
>
>
> sql如下:
> String sql2 =  "insert into dws_b2b_trade_year_index\n" +
>                "WITH temp AS (\n" +
>                "select \n" +
>                "  ta.gmtStatistical as gmtStatistical,\n" +
>                "  ta.paymentMethod as paymentMethod,\n" +
>                "  tb.CORP_ID as outCorpId,\n" +
>                "  tc.CORP_ID as inCorpId,\n" +
>                "  sum(ta.tradeAmt) as tranAmount,\n" +
>                "  sum(ta.tradeCnt) as tranNum \n" +
>                "from dws_a2a_trade_year_index ta \n" +
>                "left join dob_dim_account for system_time as of ta.proc as
> tb on ta.outAcctCode = tb.ACCT_CODE \n" +
>                "left join dob_dim_account for system_time as of ta.proc as
> tc on ta.inAcctCode = tc.ACCT_CODE \n" +
>                "group by \n" +
>                " ta.gmtStatistical, \n" +
>                " ta.paymentMethod, \n" +
>                " tb.CORP_ID, \n" +
>                " tc.CORP_ID \n" +
>                ") \n" +
>                "SELECT \n" +
>                "   DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as
> gmtUpdate, \n" +
>                "   gmtStatistical, \n" +
>                "   paymentMethod, \n" +
>                "   outCorpId, \n" +
>                "   inCorpId, \n" +
>                "   tranAmount, \n" +
>                "   tranNum \n" +
>                "FROM temp";
>
> | |
> Jason_H
> |
> |
> hyb_hello@163.com
> |

Re: flinksql join

Posted by Zhiwen Sun <pe...@gmail.com>.
dob_dim_account 维表如果使用 jdbc 的 connector, flink 会在初始化的时候一次性读取所有的数据,
后续数据库中更新并不会触发 flink 计算。

要解决这个问题, dob_dim_account 需要变成流表。


Zhiwen Sun



On Thu, Nov 17, 2022 at 1:56 PM Jason_H <hy...@163.com> wrote:

> hi,你好
> 这种方式,需要使用cdc,但是我们的现在方案里领导不考虑使用cdc,只想用flinksql去解决这个问题
>
>
> | |
> Jason_H
> |
> |
> hyb_hello@163.com
> |
> ---- Replied Message ----
> | From | 任召金<re...@100.me> |
> | Date | 11/15/2022 09:52 |
> | To | user-zh<us...@flink.apache.org> |
> | Subject | Re: flinksql join |
> hello,你可以试下,将mysql的数据通过CDC变成流数据,然后跟主流inner join,注意状态的TTL
> &nbsp;
> &nbsp;
> ------------------&nbsp;Original&nbsp;------------------
> From: &nbsp;"Jason_H"<hyb_hello@163.com&gt;;
> Date: &nbsp;Tue, Nov 15, 2022 09:46 AM
> To: &nbsp;"flink中文邮件组"<user-zh@flink.apache.org&gt;;
>
> Subject: &nbsp;Re: flinksql join
>
> &nbsp;
>
> hi,你好
> 我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。
>
>
> | |
> Jason_H
> |
> |
> hyb_hello@163.com
> |
> ---- Replied Message ----
> | From | RS<tinyshrimp@163.com&gt; |
> | Date | 11/15/2022 09:07 |
> | To | user-zh@flink.apache.org<user-zh@flink.apache.org&gt; |
> | Subject | Re:flinksql join |
> Hi,
> 我的理解是后插入的维表数据,关联不到是正常现象,
> 如果要实现AAAA=3的话,应该要手动重新跑历史数据,然后更新现有数据,
>
>
> Thanks
>
>
>
>
>
>
> 在 2022-11-11 11:10:03,"Jason_H" <hyb_hello@163.com&gt; 写道:
>
>
> hi,大家好
>
> 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
> kakfa输入:
> 账号 金额 笔数
> 1111 100 1&nbsp; -&gt; 未匹配
> 1111 100 1&nbsp; -&gt; 未匹配
> 1111 100 1&nbsp; -&gt; 匹配上
>
> 维表
> 账号&nbsp; 企业
> 2222&nbsp; BBBB
> 1111&nbsp; AAAA&nbsp;&nbsp; -&gt; 后插入的账号信息
> 实际输出结果
> 企业&nbsp; 金额&nbsp; 笔数
> AAAA 100&nbsp;&nbsp; 1
>
>
> 我想要的结果:
> 企业&nbsp; 金额&nbsp; 笔数
> AAAA 300&nbsp;&nbsp; 3
>
>
>
>
>
> sql如下:
> String sql2 =&nbsp; "insert into dws_b2b_trade_year_index\n" +
> "WITH temp AS (\n" +
> "select \n" +
> "&nbsp; ta.gmtStatistical as gmtStatistical,\n" +
> "&nbsp; ta.paymentMethod as paymentMethod,\n" +
> "&nbsp; tb.CORP_ID as outCorpId,\n" +
> "&nbsp; tc.CORP_ID as inCorpId,\n" +
> "&nbsp; sum(ta.tradeAmt) as tranAmount,\n" +
> "&nbsp; sum(ta.tradeCnt) as tranNum \n" +
> "from dws_a2a_trade_year_index ta \n" +
> "left join dob_dim_account for system_time as of ta.proc as tb on
> ta.outAcctCode = tb.ACCT_CODE \n" +
> "left join dob_dim_account for system_time as of ta.proc as tc on
> ta.inAcctCode = tc.ACCT_CODE \n" +
> "group by \n" +
> " ta.gmtStatistical, \n" +
> " ta.paymentMethod, \n" +
> " tb.CORP_ID, \n" +
> " tc.CORP_ID \n" +
> ") \n" +
> "SELECT \n" +
> "&nbsp;&nbsp; DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as
> gmtUpdate, \n" +
> "&nbsp;&nbsp; gmtStatistical, \n" +
> "&nbsp;&nbsp; paymentMethod, \n" +
> "&nbsp;&nbsp; outCorpId, \n" +
> "&nbsp;&nbsp; inCorpId, \n" +
> "&nbsp;&nbsp; tranAmount, \n" +
> "&nbsp;&nbsp; tranNum \n" +
> "FROM temp";
>
> | |
> Jason_H
> |
> |
> hyb_hello@163.com
> |

Re: flinksql join

Posted by Jason_H <hy...@163.com>.
hi,你好
这种方式,需要使用cdc,但是我们的现在方案里领导不考虑使用cdc,只想用flinksql去解决这个问题


| |
Jason_H
|
|
hyb_hello@163.com
|
---- Replied Message ----
| From | 任召金<re...@100.me> |
| Date | 11/15/2022 09:52 |
| To | user-zh<us...@flink.apache.org> |
| Subject | Re: flinksql join |
hello,你可以试下,将mysql的数据通过CDC变成流数据,然后跟主流inner join,注意状态的TTL
&nbsp;
&nbsp;
------------------&nbsp;Original&nbsp;------------------
From: &nbsp;"Jason_H"<hyb_hello@163.com&gt;;
Date: &nbsp;Tue, Nov 15, 2022 09:46 AM
To: &nbsp;"flink中文邮件组"<user-zh@flink.apache.org&gt;;

Subject: &nbsp;Re: flinksql join

&nbsp;

hi,你好
我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。


| |
Jason_H
|
|
hyb_hello@163.com
|
---- Replied Message ----
| From | RS<tinyshrimp@163.com&gt; |
| Date | 11/15/2022 09:07 |
| To | user-zh@flink.apache.org<user-zh@flink.apache.org&gt; |
| Subject | Re:flinksql join |
Hi,
我的理解是后插入的维表数据,关联不到是正常现象,
如果要实现AAAA=3的话,应该要手动重新跑历史数据,然后更新现有数据,


Thanks






在 2022-11-11 11:10:03,"Jason_H" <hyb_hello@163.com&gt; 写道:


hi,大家好
我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
1111 100 1&nbsp; -&gt; 未匹配
1111 100 1&nbsp; -&gt; 未匹配
1111 100 1&nbsp; -&gt; 匹配上

维表
账号&nbsp; 企业
2222&nbsp; BBBB
1111&nbsp; AAAA&nbsp;&nbsp; -&gt; 后插入的账号信息
实际输出结果
企业&nbsp; 金额&nbsp; 笔数
AAAA 100&nbsp;&nbsp; 1


我想要的结果:
企业&nbsp; 金额&nbsp; 笔数
AAAA 300&nbsp;&nbsp; 3





sql如下:
String sql2 =&nbsp; "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"&nbsp; ta.gmtStatistical as gmtStatistical,\n" +
"&nbsp; ta.paymentMethod as paymentMethod,\n" +
"&nbsp; tb.CORP_ID as outCorpId,\n" +
"&nbsp; tc.CORP_ID as inCorpId,\n" +
"&nbsp; sum(ta.tradeAmt) as tranAmount,\n" +
"&nbsp; sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account for system_time as of ta.proc as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"&nbsp;&nbsp; DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" +
"&nbsp;&nbsp; gmtStatistical, \n" +
"&nbsp;&nbsp; paymentMethod, \n" +
"&nbsp;&nbsp; outCorpId, \n" +
"&nbsp;&nbsp; inCorpId, \n" +
"&nbsp;&nbsp; tranAmount, \n" +
"&nbsp;&nbsp; tranNum \n" +
"FROM temp";

| |
Jason_H
|
|
hyb_hello@163.com
|

Re: flinksql join

Posted by 任召金 <re...@100.me>.
hello,你可以试下,将mysql的数据通过CDC变成流数据,然后跟主流inner join,注意状态的TTL
&nbsp;
&nbsp;
------------------&nbsp;Original&nbsp;------------------
From: &nbsp;"Jason_H"<hyb_hello@163.com&gt;;
Date: &nbsp;Tue, Nov 15, 2022 09:46 AM
To: &nbsp;"flink中文邮件组"<user-zh@flink.apache.org&gt;; 

Subject: &nbsp;Re: flinksql join

&nbsp;

hi,你好
我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。


| |
Jason_H
|
|
hyb_hello@163.com
|
---- Replied Message ----
| From | RS<tinyshrimp@163.com&gt; |
| Date | 11/15/2022 09:07 |
| To | user-zh@flink.apache.org<user-zh@flink.apache.org&gt; |
| Subject | Re:flinksql join |
Hi,
我的理解是后插入的维表数据,关联不到是正常现象,
如果要实现AAAA=3的话,应该要手动重新跑历史数据,然后更新现有数据,


Thanks






在 2022-11-11 11:10:03,"Jason_H" <hyb_hello@163.com&gt; 写道:


hi,大家好
我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
1111 100 1&nbsp; -&gt; 未匹配
1111 100 1&nbsp; -&gt; 未匹配
1111 100 1&nbsp; -&gt; 匹配上

维表
账号&nbsp; 企业
2222&nbsp; BBBB
1111&nbsp; AAAA&nbsp;&nbsp; -&gt; 后插入的账号信息
实际输出结果
企业&nbsp; 金额&nbsp; 笔数
AAAA 100&nbsp;&nbsp; 1


我想要的结果:
企业&nbsp; 金额&nbsp; 笔数
AAAA 300&nbsp;&nbsp; 3





sql如下:
String sql2 =&nbsp; "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"&nbsp; ta.gmtStatistical as gmtStatistical,\n" +
"&nbsp; ta.paymentMethod as paymentMethod,\n" +
"&nbsp; tb.CORP_ID as outCorpId,\n" +
"&nbsp; tc.CORP_ID as inCorpId,\n" +
"&nbsp; sum(ta.tradeAmt) as tranAmount,\n" +
"&nbsp; sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account for system_time as of ta.proc as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"&nbsp;&nbsp; DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" +
"&nbsp;&nbsp; gmtStatistical, \n" +
"&nbsp;&nbsp; paymentMethod, \n" +
"&nbsp;&nbsp; outCorpId, \n" +
"&nbsp;&nbsp; inCorpId, \n" +
"&nbsp;&nbsp; tranAmount, \n" +
"&nbsp;&nbsp; tranNum \n" +
"FROM temp";

| |
Jason_H
|
|
hyb_hello@163.com
|

Re: flinksql join

Posted by Jason_H <hy...@163.com>.
hi,你好
我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。


| |
Jason_H
|
|
hyb_hello@163.com
|
---- Replied Message ----
| From | RS<ti...@163.com> |
| Date | 11/15/2022 09:07 |
| To | user-zh@flink.apache.org<us...@flink.apache.org> |
| Subject | Re:flinksql join |
Hi,
我的理解是后插入的维表数据,关联不到是正常现象,
如果要实现AAAA=3的话,应该要手动重新跑历史数据,然后更新现有数据,


Thanks






在 2022-11-11 11:10:03,"Jason_H" <hy...@163.com> 写道:


hi,大家好
我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
1111 100 1  -> 未匹配
1111 100 1  -> 未匹配
1111 100 1  -> 匹配上

维表
账号  企业
2222  BBBB
1111  AAAA   -> 后插入的账号信息
实际输出结果
企业  金额  笔数
AAAA 100   1


我想要的结果:
企业  金额  笔数
AAAA 300   3





sql如下:
String sql2 =  "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account for system_time as of ta.proc as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"   DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" +
"   gmtStatistical, \n" +
"   paymentMethod, \n" +
"   outCorpId, \n" +
"   inCorpId, \n" +
"   tranAmount, \n" +
"   tranNum \n" +
"FROM temp";

| |
Jason_H
|
|
hyb_hello@163.com
|

Re: flinksql join

Posted by Jason_H <hy...@163.com>.
hi,你好
我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。


| |
Jason
|
|
hyb_hello@163.com
|
---- Replied Message ----
| From | RS<ti...@163.com> |
| Date | 11/15/2022 09:07 |
| To | user-zh@flink.apache.org<us...@flink.apache.org> |
| Subject | Re:flinksql join |
Hi,
我的理解是后插入的维表数据,关联不到是正常现象,
如果要实现AAAA=3的话,应该要手动重新跑历史数据,然后更新现有数据,


Thanks






在 2022-11-11 11:10:03,"Jason_H" <hy...@163.com> 写道:


hi,大家好
我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
1111 100 1  -> 未匹配
1111 100 1  -> 未匹配
1111 100 1  -> 匹配上

维表
账号  企业
2222  BBBB
1111  AAAA   -> 后插入的账号信息
实际输出结果
企业  金额  笔数
AAAA 100   1


我想要的结果:
企业  金额  笔数
AAAA 300   3





sql如下:
String sql2 =  "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account for system_time as of ta.proc as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"   DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" +
"   gmtStatistical, \n" +
"   paymentMethod, \n" +
"   outCorpId, \n" +
"   inCorpId, \n" +
"   tranAmount, \n" +
"   tranNum \n" +
"FROM temp";

| |
Jason_H
|
|
hyb_hello@163.com
|

Re:flinksql join

Posted by RS <ti...@163.com>.
Hi,
我的理解是后插入的维表数据,关联不到是正常现象,
如果要实现AAAA=3的话,应该要手动重新跑历史数据,然后更新现有数据,


Thanks






在 2022-11-11 11:10:03,"Jason_H" <hy...@163.com> 写道:
>
>
>hi,大家好
>我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
>kakfa输入:
>账号 金额 笔数
>1111 100 1  -> 未匹配
>1111 100 1  -> 未匹配
>1111 100 1  -> 匹配上
>
>维表 
>账号  企业
>2222  BBBB
>1111  AAAA   -> 后插入的账号信息
>实际输出结果
>企业  金额  笔数
>AAAA 100   1
>
>
>我想要的结果:
>企业  金额  笔数
>AAAA 300   3
>
>
>
>
>
>sql如下:
>String sql2 =  "insert into dws_b2b_trade_year_index\n" +
>               "WITH temp AS (\n" +
>               "select \n" +
>               "  ta.gmtStatistical as gmtStatistical,\n" +
>               "  ta.paymentMethod as paymentMethod,\n" +
>               "  tb.CORP_ID as outCorpId,\n" +
>               "  tc.CORP_ID as inCorpId,\n" +
>               "  sum(ta.tradeAmt) as tranAmount,\n" +
>               "  sum(ta.tradeCnt) as tranNum \n" +
>               "from dws_a2a_trade_year_index ta \n" +
>               "left join dob_dim_account for system_time as of ta.proc as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
>               "left join dob_dim_account for system_time as of ta.proc as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
>               "group by \n" +
>               " ta.gmtStatistical, \n" +
>               " ta.paymentMethod, \n" +
>               " tb.CORP_ID, \n" +
>               " tc.CORP_ID \n" +
>               ") \n" +
>               "SELECT \n" +
>               "   DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" +
>               "   gmtStatistical, \n" +
>               "   paymentMethod, \n" +
>               "   outCorpId, \n" +
>               "   inCorpId, \n" +
>               "   tranAmount, \n" +
>               "   tranNum \n" +
>               "FROM temp";
>
>| |
>Jason_H
>|
>|
>hyb_hello@163.com
>|