You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 111 <xi...@163.com> on 2020/04/02 01:55:27 UTC

Flink SQL1.10使用Mysql作为维表关联,数据量太大导致task manager time out

Hi,
想确认下MySQL JDBC Connector的实现细节,目前了解到的实现是:
1 获取查询sql中的字段和表名,拼接成select a, b, c from t
2 创建并行输入,如果制定了分区字段,会按照分区字段和并行度切分生成where条件,如where id between xxx and xxx
3 执行sql,加载到内存(不确定后面缓存的实现细节)


目前我们遇到的问题是,我们把mysql作为维表(表的量级在一千500万左右,并行度为10),没有指定分区条件(只有一个slot执行查询,其他的都没有查询任务)。
结果导致只有一个分区查询数据,查询的sql为select xxx from t(全表查询)。
可是现在并行度由于集群的限制,不能调整的过大,如何实现按需查询呢?


之前我们在datastream api中,是使用异步API,根据某个条件单独查询某条数据,所以速度很快;现在需要全部加载过来,资源消耗太大了。
不知道有没有什么优雅的解决方案?


Best,
Xinghalo

回复: Flink SQL1.10使用Mysql作为维表关联,数据量太大导致task manager time out

Posted by 111 <xi...@163.com>.

Hi benchao,
原来如此,我这边只是做了普通查询,并没有走join。
我加上join条件再试下哈
Best,
Xinghalo
在2020年04月2日 10:11,Benchao Li<li...@gmail.com> 写道:
Hi,

能否把你的SQL也发出来呢?
正常来讲,维表关联用的是join的等值条件作为关联的条件去mysql查询,然后如果还有其他的filter,会在关联之后的结果之上在做filter。如果你发现每次都是扫描全表,很有可能是你的维表join的条件写的有问题导致的。

111 <xi...@163.com> 于2020年4月2日周四 上午9:55写道:

Hi,
想确认下MySQL JDBC Connector的实现细节,目前了解到的实现是:
1 获取查询sql中的字段和表名,拼接成select a, b, c from t
2 创建并行输入,如果制定了分区字段,会按照分区字段和并行度切分生成where条件,如where id between xxx and xxx
3 执行sql,加载到内存(不确定后面缓存的实现细节)



目前我们遇到的问题是,我们把mysql作为维表(表的量级在一千500万左右,并行度为10),没有指定分区条件(只有一个slot执行查询,其他的都没有查询任务)。
结果导致只有一个分区查询数据,查询的sql为select xxx from t(全表查询)。
可是现在并行度由于集群的限制,不能调整的过大,如何实现按需查询呢?


之前我们在datastream api中,是使用异步API,根据某个条件单独查询某条数据,所以速度很快;现在需要全部加载过来,资源消耗太大了。
不知道有没有什么优雅的解决方案?


Best,
Xinghalo



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

Re: Flink SQL1.10使用Mysql作为维表关联,数据量太大导致task manager time out

Posted by godfrey he <go...@gmail.com>.
Hi Xinghalo,

欢迎向 sql gateway 贡献~

Best,
Godfrey

111 <xi...@163.com> 于2020年4月2日周四 上午11:10写道:

> Hi,
> 了解了,那我知道怎么解决了。我这边使用的是sql-gateway,看样子得在sql-gateway里面加一种表定义的语法了。
> 多谢多谢
>
>
> Best,
> Xinghalo
> 在2020年04月2日 10:52,Benchao Li<li...@gmail.com> 写道:
> 你写的不是维表join的语法,维表join现在用的是temporal table[1] 的方式来实现的,需要特殊的join语法:
>
> SELECT
> o.amout, o.currency, r.rate, o.amount * r.rateFROM
> Orders AS o
> JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
> ON r.currency = o.currency
>
> 此外,你看的JDBCTableSource是一个普通的bounded source,或者是batch
> source。真正的维表的代码是在JDBCLookupFunction里面的。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins
>
> 111 <xi...@163.com> 于2020年4月2日周四 上午10:33写道:
>
> Hi,
> 试验了下貌似不行,我的sql:
>
>
> select s.*, item.product_name
> from  ( select member_id, uuid, page, cast(SPLIT_INDEX(page,
> '10.pd.item-', 1) as string) as item_id,  `time` from tgs_topic_t1  where
> page like '10.pd.item-%’ ) s
> inner join PROD_ITEM_MALL_ACTIVITY_PRODUCT item on cast(item.id as
> string) = s.item_id
> where s.item_id is not null
>
>
> 看了下代码JDBCTableSource中的实现
> String query = dialect.getSelectFromStatement( options.getTableName(),
> returnType.getFieldNames(), new String[0]);
> 构建sql的时候  conditionFields 是写死的空数组,因此肯定不会有查询条件参与进来。
> 后面真正open()连接查询的时候,也只处理了分区字段,并没有查询条件字段。
> 在2020年04月2日 10:11,Benchao Li<li...@gmail.com> 写道:
> Hi,
>
> 能否把你的SQL也发出来呢?
>
>
> 正常来讲,维表关联用的是join的等值条件作为关联的条件去mysql查询,然后如果还有其他的filter,会在关联之后的结果之上在做filter。如果你发现每次都是扫描全表,很有可能是你的维表join的条件写的有问题导致的。
>
> 111 <xi...@163.com> 于2020年4月2日周四 上午9:55写道:
>
> Hi,
> 想确认下MySQL JDBC Connector的实现细节,目前了解到的实现是:
> 1 获取查询sql中的字段和表名,拼接成select a, b, c from t
> 2 创建并行输入,如果制定了分区字段,会按照分区字段和并行度切分生成where条件,如where id between xxx and xxx
> 3 执行sql,加载到内存(不确定后面缓存的实现细节)
>
>
>
>
>
> 目前我们遇到的问题是,我们把mysql作为维表(表的量级在一千500万左右,并行度为10),没有指定分区条件(只有一个slot执行查询,其他的都没有查询任务)。
> 结果导致只有一个分区查询数据,查询的sql为select xxx from t(全表查询)。
> 可是现在并行度由于集群的限制,不能调整的过大,如何实现按需查询呢?
>
>
> 之前我们在datastream api中,是使用异步API,根据某个条件单独查询某条数据,所以速度很快;现在需要全部加载过来,资源消耗太大了。
> 不知道有没有什么优雅的解决方案?
>
>
> Best,
> Xinghalo
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>

回复: Flink SQL1.10使用Mysql作为维表关联,数据量太大导致task manager time out

Posted by 111 <xi...@163.com>.
Hi,
了解了,那我知道怎么解决了。我这边使用的是sql-gateway,看样子得在sql-gateway里面加一种表定义的语法了。
多谢多谢


Best,
Xinghalo
在2020年04月2日 10:52,Benchao Li<li...@gmail.com> 写道:
你写的不是维表join的语法,维表join现在用的是temporal table[1] 的方式来实现的,需要特殊的join语法:

SELECT
o.amout, o.currency, r.rate, o.amount * r.rateFROM
Orders AS o
JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
ON r.currency = o.currency

此外,你看的JDBCTableSource是一个普通的bounded source,或者是batch
source。真正的维表的代码是在JDBCLookupFunction里面的。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins

111 <xi...@163.com> 于2020年4月2日周四 上午10:33写道:

Hi,
试验了下貌似不行,我的sql:


select s.*, item.product_name
from  ( select member_id, uuid, page, cast(SPLIT_INDEX(page,
'10.pd.item-', 1) as string) as item_id,  `time` from tgs_topic_t1  where
page like '10.pd.item-%’ ) s
inner join PROD_ITEM_MALL_ACTIVITY_PRODUCT item on cast(item.id as
string) = s.item_id
where s.item_id is not null


看了下代码JDBCTableSource中的实现
String query = dialect.getSelectFromStatement( options.getTableName(),
returnType.getFieldNames(), new String[0]);
构建sql的时候  conditionFields 是写死的空数组,因此肯定不会有查询条件参与进来。
后面真正open()连接查询的时候,也只处理了分区字段,并没有查询条件字段。
在2020年04月2日 10:11,Benchao Li<li...@gmail.com> 写道:
Hi,

能否把你的SQL也发出来呢?

正常来讲,维表关联用的是join的等值条件作为关联的条件去mysql查询,然后如果还有其他的filter,会在关联之后的结果之上在做filter。如果你发现每次都是扫描全表,很有可能是你的维表join的条件写的有问题导致的。

111 <xi...@163.com> 于2020年4月2日周四 上午9:55写道:

Hi,
想确认下MySQL JDBC Connector的实现细节,目前了解到的实现是:
1 获取查询sql中的字段和表名,拼接成select a, b, c from t
2 创建并行输入,如果制定了分区字段,会按照分区字段和并行度切分生成where条件,如where id between xxx and xxx
3 执行sql,加载到内存(不确定后面缓存的实现细节)




目前我们遇到的问题是,我们把mysql作为维表(表的量级在一千500万左右,并行度为10),没有指定分区条件(只有一个slot执行查询,其他的都没有查询任务)。
结果导致只有一个分区查询数据,查询的sql为select xxx from t(全表查询)。
可是现在并行度由于集群的限制,不能调整的过大,如何实现按需查询呢?


之前我们在datastream api中,是使用异步API,根据某个条件单独查询某条数据,所以速度很快;现在需要全部加载过来,资源消耗太大了。
不知道有没有什么优雅的解决方案?


Best,
Xinghalo



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

Re: Flink SQL1.10使用Mysql作为维表关联,数据量太大导致task manager time out

Posted by Benchao Li <li...@gmail.com>.
你写的不是维表join的语法,维表join现在用的是temporal table[1] 的方式来实现的,需要特殊的join语法:

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rateFROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency

此外,你看的JDBCTableSource是一个普通的bounded source,或者是batch
source。真正的维表的代码是在JDBCLookupFunction里面的。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins

111 <xi...@163.com> 于2020年4月2日周四 上午10:33写道:

> Hi,
> 试验了下貌似不行,我的sql:
>
>
> select s.*, item.product_name
> from  ( select member_id, uuid, page, cast(SPLIT_INDEX(page,
> '10.pd.item-', 1) as string) as item_id,  `time` from tgs_topic_t1  where
> page like '10.pd.item-%’ ) s
> inner join PROD_ITEM_MALL_ACTIVITY_PRODUCT item on cast(item.id as
> string) = s.item_id
> where s.item_id is not null
>
>
> 看了下代码JDBCTableSource中的实现
> String query = dialect.getSelectFromStatement( options.getTableName(),
> returnType.getFieldNames(), new String[0]);
> 构建sql的时候  conditionFields 是写死的空数组,因此肯定不会有查询条件参与进来。
> 后面真正open()连接查询的时候,也只处理了分区字段,并没有查询条件字段。
> 在2020年04月2日 10:11,Benchao Li<li...@gmail.com> 写道:
> Hi,
>
> 能否把你的SQL也发出来呢?
>
> 正常来讲,维表关联用的是join的等值条件作为关联的条件去mysql查询,然后如果还有其他的filter,会在关联之后的结果之上在做filter。如果你发现每次都是扫描全表,很有可能是你的维表join的条件写的有问题导致的。
>
> 111 <xi...@163.com> 于2020年4月2日周四 上午9:55写道:
>
> Hi,
> 想确认下MySQL JDBC Connector的实现细节,目前了解到的实现是:
> 1 获取查询sql中的字段和表名,拼接成select a, b, c from t
> 2 创建并行输入,如果制定了分区字段,会按照分区字段和并行度切分生成where条件,如where id between xxx and xxx
> 3 执行sql,加载到内存(不确定后面缓存的实现细节)
>
>
>
>
> 目前我们遇到的问题是,我们把mysql作为维表(表的量级在一千500万左右,并行度为10),没有指定分区条件(只有一个slot执行查询,其他的都没有查询任务)。
> 结果导致只有一个分区查询数据,查询的sql为select xxx from t(全表查询)。
> 可是现在并行度由于集群的限制,不能调整的过大,如何实现按需查询呢?
>
>
> 之前我们在datastream api中,是使用异步API,根据某个条件单独查询某条数据,所以速度很快;现在需要全部加载过来,资源消耗太大了。
> 不知道有没有什么优雅的解决方案?
>
>
> Best,
> Xinghalo
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

回复: Flink SQL1.10使用Mysql作为维表关联,数据量太大导致task manager time out

Posted by 111 <xi...@163.com>.
Hi,
试验了下貌似不行,我的sql:


select s.*, item.product_name
from  ( select member_id, uuid, page, cast(SPLIT_INDEX(page, '10.pd.item-', 1) as string) as item_id,  `time` from tgs_topic_t1  where page like '10.pd.item-%’ ) s
inner join PROD_ITEM_MALL_ACTIVITY_PRODUCT item on cast(item.id as string) = s.item_id
where s.item_id is not null


看了下代码JDBCTableSource中的实现
String query = dialect.getSelectFromStatement( options.getTableName(), returnType.getFieldNames(), new String[0]);
构建sql的时候  conditionFields 是写死的空数组,因此肯定不会有查询条件参与进来。
后面真正open()连接查询的时候,也只处理了分区字段,并没有查询条件字段。
在2020年04月2日 10:11,Benchao Li<li...@gmail.com> 写道:
Hi,

能否把你的SQL也发出来呢?
正常来讲,维表关联用的是join的等值条件作为关联的条件去mysql查询,然后如果还有其他的filter,会在关联之后的结果之上在做filter。如果你发现每次都是扫描全表,很有可能是你的维表join的条件写的有问题导致的。

111 <xi...@163.com> 于2020年4月2日周四 上午9:55写道:

Hi,
想确认下MySQL JDBC Connector的实现细节,目前了解到的实现是:
1 获取查询sql中的字段和表名,拼接成select a, b, c from t
2 创建并行输入,如果制定了分区字段,会按照分区字段和并行度切分生成where条件,如where id between xxx and xxx
3 执行sql,加载到内存(不确定后面缓存的实现细节)



目前我们遇到的问题是,我们把mysql作为维表(表的量级在一千500万左右,并行度为10),没有指定分区条件(只有一个slot执行查询,其他的都没有查询任务)。
结果导致只有一个分区查询数据,查询的sql为select xxx from t(全表查询)。
可是现在并行度由于集群的限制,不能调整的过大,如何实现按需查询呢?


之前我们在datastream api中,是使用异步API,根据某个条件单独查询某条数据,所以速度很快;现在需要全部加载过来,资源消耗太大了。
不知道有没有什么优雅的解决方案?


Best,
Xinghalo



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

Re: Flink SQL1.10使用Mysql作为维表关联,数据量太大导致task manager time out

Posted by Benchao Li <li...@gmail.com>.
Hi,

能否把你的SQL也发出来呢?
正常来讲,维表关联用的是join的等值条件作为关联的条件去mysql查询,然后如果还有其他的filter,会在关联之后的结果之上在做filter。如果你发现每次都是扫描全表,很有可能是你的维表join的条件写的有问题导致的。

111 <xi...@163.com> 于2020年4月2日周四 上午9:55写道:

> Hi,
> 想确认下MySQL JDBC Connector的实现细节,目前了解到的实现是:
> 1 获取查询sql中的字段和表名,拼接成select a, b, c from t
> 2 创建并行输入,如果制定了分区字段,会按照分区字段和并行度切分生成where条件,如where id between xxx and xxx
> 3 执行sql,加载到内存(不确定后面缓存的实现细节)
>
>
>
> 目前我们遇到的问题是,我们把mysql作为维表(表的量级在一千500万左右,并行度为10),没有指定分区条件(只有一个slot执行查询,其他的都没有查询任务)。
> 结果导致只有一个分区查询数据,查询的sql为select xxx from t(全表查询)。
> 可是现在并行度由于集群的限制,不能调整的过大,如何实现按需查询呢?
>
>
> 之前我们在datastream api中,是使用异步API,根据某个条件单独查询某条数据,所以速度很快;现在需要全部加载过来,资源消耗太大了。
> 不知道有没有什么优雅的解决方案?
>
>
> Best,
> Xinghalo



-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn