You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by zhang hao <zh...@gmail.com> on 2021/01/21 07:47:45 UTC

flink sql 执行limit 很少的语句依然会暴增

请教个问题,使用flink sql 去拉取mysql数据,mysql源表有千万级别数据量,使用了 select * from sourcTable
limit 10;
即使是limit几条数据也会导致内存暴增。这里的limit是从mysql原表执行后 直接在flink taskmanager进行limit取数据吗?

Re: flink sql 执行limit 很少的语句依然会暴增

Posted by Shengkai Fang <fs...@gmail.com>.
hi,
报错信息: java.lang.UnsupportedOperationException: Currently, a
DynamicTableSource with SupportsLimitPushDown ability is not supported.

如果你当前的版本不是1.12的话,那么你还需要pick下rule[1]。可以关注下这个jira[2],这里包含了所有对于SupportXXX的优化。

如果只是本地测试的话还是建议用发布的1.12 + 之前提到的commit,自己pick可能有点问题。

[1] https://github.com/apache/flink/pull/12964
[2] https://issues.apache.org/jira/browse/FLINK-16987

zhang hao <zh...@gmail.com> 于2021年1月25日周一 下午3:14写道:

> flink run -py new_jdbc_source.py
> Traceback (most recent call last):
>   File "new_jdbc_source.py", line 66, in <module>
>     st_env.execute_sql("select * from feature_bar_sink").print()
>   File
>
> "/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py",
> line 543, in execute_sql
>   File
>
> "/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> line 1286, in __call__
>   File
>
> "/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line 147, in deco
>   File
>
> "/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o10.executeSql.
> : java.lang.UnsupportedOperationException: Currently, a DynamicTableSource
> with SupportsLimitPushDown ability is not supported.
> at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable$$anonfun$validateTableSource$1.apply(CatalogSourceTable.scala:210)
> at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable$$anonfun$validateTableSource$1.apply(CatalogSourceTable.scala:208)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.validateTableSource(CatalogSourceTable.scala:208)
> at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:142)
> at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> at
>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> at
>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
> at
>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
> at
>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> at
>
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
>
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at
>
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
>
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
>
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
>
>
> org.apache.flink.client.program.ProgramAbortException
> at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at
>
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>
> 我试着cherry -pick  这几个commit
>
> https://github.com/apache/flink/pull/13800/commits/ba4c6121faa50f3aa26b8c05bf7ea36b85d82642
> 出现这个报错了
>
> On Fri, Jan 22, 2021 at 11:35 AM Shengkai Fang <fs...@gmail.com> wrote:
>
> > hi, LIMIT PUSH DOWN 近期已经merge进了master分支了。
> >
> > [1] https://github.com/apache/flink/pull/13800
> >
> > Land <la...@qq.com> 于2021年1月22日周五 上午11:28写道:
> >
> > > 可能是没有下推到MySQL执行。
> > > 问题和我遇到的类似:
> > > http://apache-flink.147419.n8.nabble.com/Flink-MySQL-td10374.html
> > >
> > >
> > >
> > > --
> > > Sent from: http://apache-flink.147419.n8.nabble.com/
> > >
> >
>

Re: flink sql 执行limit 很少的语句依然会暴增

Posted by zhang hao <zh...@gmail.com>.
flink run -py new_jdbc_source.py
Traceback (most recent call last):
  File "new_jdbc_source.py", line 66, in <module>
    st_env.execute_sql("select * from feature_bar_sink").print()
  File
"/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py",
line 543, in execute_sql
  File
"/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
line 1286, in __call__
  File
"/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py",
line 147, in deco
  File
"/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o10.executeSql.
: java.lang.UnsupportedOperationException: Currently, a DynamicTableSource
with SupportsLimitPushDown ability is not supported.
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable$$anonfun$validateTableSource$1.apply(CatalogSourceTable.scala:210)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable$$anonfun$validateTableSource$1.apply(CatalogSourceTable.scala:208)
at scala.collection.immutable.List.foreach(List.scala:392)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.validateTableSource(CatalogSourceTable.scala:208)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:142)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)


org.apache.flink.client.program.ProgramAbortException
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)

我试着cherry -pick  这几个commit
https://github.com/apache/flink/pull/13800/commits/ba4c6121faa50f3aa26b8c05bf7ea36b85d82642
出现这个报错了

On Fri, Jan 22, 2021 at 11:35 AM Shengkai Fang <fs...@gmail.com> wrote:

> hi, LIMIT PUSH DOWN 近期已经merge进了master分支了。
>
> [1] https://github.com/apache/flink/pull/13800
>
> Land <la...@qq.com> 于2021年1月22日周五 上午11:28写道:
>
> > 可能是没有下推到MySQL执行。
> > 问题和我遇到的类似:
> > http://apache-flink.147419.n8.nabble.com/Flink-MySQL-td10374.html
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>

Re: flink sql 执行limit 很少的语句依然会暴增

Posted by zhang hao <zh...@gmail.com>.
嗯嗯 好的 谢谢大家 ,应该就是这个问题了,merge到分支验证下

On Fri, Jan 22, 2021 at 11:35 AM Shengkai Fang <fs...@gmail.com> wrote:

> hi, LIMIT PUSH DOWN 近期已经merge进了master分支了。
>
> [1] https://github.com/apache/flink/pull/13800
>
> Land <la...@qq.com> 于2021年1月22日周五 上午11:28写道:
>
> > 可能是没有下推到MySQL执行。
> > 问题和我遇到的类似:
> > http://apache-flink.147419.n8.nabble.com/Flink-MySQL-td10374.html
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>

Re: flink sql 执行limit 很少的语句依然会暴增

Posted by Shengkai Fang <fs...@gmail.com>.
hi, LIMIT PUSH DOWN 近期已经merge进了master分支了。

[1] https://github.com/apache/flink/pull/13800

Land <la...@qq.com> 于2021年1月22日周五 上午11:28写道:

> 可能是没有下推到MySQL执行。
> 问题和我遇到的类似:
> http://apache-flink.147419.n8.nabble.com/Flink-MySQL-td10374.html
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>

Re: flink sql 执行limit 很少的语句依然会暴增

Posted by Land <la...@qq.com>.
可能是没有下推到MySQL执行。
问题和我遇到的类似:http://apache-flink.147419.n8.nabble.com/Flink-MySQL-td10374.html



--
Sent from: http://apache-flink.147419.n8.nabble.com/