You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by xin Destiny <nj...@gmail.com> on 2020/07/05 02:51:48 UTC

can't exectue query when table type is datagen

Hi, all:
i use zeppelin execute sql, FLink version is Flink 1.11 snapshot ,build
from branch release-1.11 ,commit is 334f35cbd6da754d8b5b294032cd84c858b1f973
when the table type is datagen, Flink will thrown exception ,but the
exception message is null ;

My DDL is :
CREATE TABLE datagen_dijie2 (
 f_sequence INT,
 f_random INT,
 f_random_str STRING,
 ts AS localtimestamp,
 WATERMARK FOR ts AS ts
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='5',
 'fields.f_sequence.kind'='sequence',
 'fields.f_sequence.start'='1',
 'fields.f_sequence.end'='1000',
 'fields.f_random.min'='1',
 'fields.f_random.max'='1000',
 'fields.f_random_str.length'='10'
);

My query sql is :
select * from datagen_dijie2;
the exception is :
Fail to run sql command: select * from datagen_dijie2
org.apache.flink.table.api.ValidationException: SQL validation failed. null
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658)
at
org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:102)
at
org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:526)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:297)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:191)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:156)
at
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:776)
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668)
at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
at
org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Caused by:
java.lang.UnsupportedOperationException at
org.apache.flink.table.planner.ParserImpl.parseSqlExpression(ParserImpl.java:86)
at
org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119)
at
org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83)
at
org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380)
at
org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408)
at
org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:375)
at
org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:75)
at
org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
at
org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
at
org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
at
org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112)
at
org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184)
at
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
at
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
... 19 more

How to deal with ?

Re: can't exectue query when table type is datagen

Posted by Danny Chan <yu...@gmail.com>.
Dear xin Destiny ~

It seems that you use the legacy planner so the exception throws [1] ~

I agree that there needs a prompt here to indicate that it is a legacy planner, have fired an issue [2],
Actually for legacy, it is a regression because before the change, the computed column is supported well.

[1] https://github.com/apache/flink/blob/1b1c343e8964c6400c7c1de3c70212522ba59a64/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java#L86
[2] https://issues.apache.org/jira/browse/FLINK-18500

Best,
Danny Chan
在 2020年7月5日 +0800 AM10:52,xin Destiny <nj...@gmail.com>,写道:
> Hi, all:
> i use zeppelin execute sql, FLink version is Flink 1.11 snapshot ,build from branch release-1.11 ,commit is 334f35cbd6da754d8b5b294032cd84c858b1f973
> when the table type is datagen, Flink will thrown exception ,but the exception message is null ;
>
> My DDL is :
> CREATE TABLE datagen_dijie2 (
>  f_sequence INT,
>  f_random INT,
>  f_random_str STRING,
>  ts AS localtimestamp,
>  WATERMARK FOR ts AS ts
> ) WITH (
>  'connector' = 'datagen',
>  'rows-per-second'='5',
>  'fields.f_sequence.kind'='sequence',
>  'fields.f_sequence.start'='1',
>  'fields.f_sequence.end'='1000',
>  'fields.f_random.min'='1',
>  'fields.f_random.max'='1000',
>  'fields.f_random_str.length'='10'
> );
>
> My query sql is :
> select * from datagen_dijie2;
> the exception is :
> Fail to run sql command: select * from datagen_dijie2 org.apache.flink.table.api.ValidationException: SQL validation failed. null at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658) at org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:102) at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:526) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:297) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:191) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:156) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:776) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668) at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130) at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.UnsupportedOperationException at org.apache.flink.table.planner.ParserImpl.parseSqlExpression(ParserImpl.java:86) at org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119) at org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83) at org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380) at org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408) at org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:375) at org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:75) at org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83) at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289) at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143) at org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99) at org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203) at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112) at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) ... 19 more
>
> How to deal with ?

Re: can't exectue query when table type is datagen

Posted by Jingsong Li <ji...@gmail.com>.
Hi,

Looks like you are using the watermark feature with the old Flink planner?

Is this what you expect?  Or can you change the planner to Blink planner?

Best,
Jingsong

On Sun, Jul 5, 2020 at 10:52 AM xin Destiny <nj...@gmail.com> wrote:

> Hi, all:
> i use zeppelin execute sql, FLink version is Flink 1.11 snapshot ,build
> from branch release-1.11 ,commit is 334f35cbd6da754d8b5b294032cd84c858b1f973
> when the table type is datagen, Flink will thrown exception ,but the
> exception message is null ;
>
> My DDL is :
> CREATE TABLE datagen_dijie2 (
>  f_sequence INT,
>  f_random INT,
>  f_random_str STRING,
>  ts AS localtimestamp,
>  WATERMARK FOR ts AS ts
> ) WITH (
>  'connector' = 'datagen',
>  'rows-per-second'='5',
>  'fields.f_sequence.kind'='sequence',
>  'fields.f_sequence.start'='1',
>  'fields.f_sequence.end'='1000',
>  'fields.f_random.min'='1',
>  'fields.f_random.max'='1000',
>  'fields.f_random_str.length'='10'
> );
>
> My query sql is :
> select * from datagen_dijie2;
> the exception is :
> Fail to run sql command: select * from datagen_dijie2
> org.apache.flink.table.api.ValidationException: SQL validation failed. null
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658)
> at
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:102)
> at
> org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89)
> at
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:526)
> at
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:297)
> at
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:191)
> at
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:156)
> at
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
> at
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:776)
> at
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668)
> at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at
> org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
> at
> org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748) Caused by:
> java.lang.UnsupportedOperationException at
> org.apache.flink.table.planner.ParserImpl.parseSqlExpression(ParserImpl.java:86)
> at
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119)
> at
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83)
> at
> org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380)
> at
> org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408)
> at
> org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:375)
> at
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:75)
> at
> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
> at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
> at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
> at
> org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
> at
> org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
> at
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112)
> at
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
> at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
> ... 19 more
>
> How to deal with ?
>


-- 
Best, Jingsong Lee