You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "shizhengchao (Jira)" <ji...@apache.org> on 2020/10/20 11:20:00 UTC
[jira] [Updated] (FLINK-19735) TableFunction can not work in Flink
View
[ https://issues.apache.org/jira/browse/FLINK-19735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
shizhengchao updated FLINK-19735:
---------------------------------
Description:
TableFunction can't be work in Flink Sql. Here is my code:
{code:sql}
CREATE TABLE test (
myField STRING,
name STRING
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'xxxx',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'mygroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv'
);
CREATE TABLE print (
myField STRING,
newWord STRING,
newLength INT
) WITH (
'connector' = 'print'
);
CREATE VIEW test_view AS
SELECT myField, newWord, newLength FROM test, LATERAL TABLE(SplitFunction(myField));
INSERT INTO print
SELECT * FROM test_view;
{code}
And the function code as this:
{code:java}
@FunctionHint(output = @DataTypeHint("ROW<newWordSTRING, newLength INT>"))
public class SplitFunction extends TableFunction<Row> {
public void eval(String str) {
for (String s : str.split(" ")) {
collect(Row.of(s, s.length()));
}
}
}
{code}
run the sql,cause an error:
{code}
Unable to find source-code formatter for language: log. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yamlException in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 2, column 17 to line 2, column 23: Column 'newWord' not found in any table
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52)
at com.fcbox.streaming.sql.submit.StreamingJob.callInsertInto(StreamingJob.java:208)
at com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:200)
at com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:129)
at com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:73)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 2, column 17 to line 2, column 23: Column 'newWord' not found in any table
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259)
at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991)
at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6154)
at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6140)
at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321)
at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5574)
at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:452)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4255)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3523)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
... 10 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'newWord' not found in any table
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
... 30 more
{code}
But it work effect in "INSERT INTO" statement:
{code:sql}
INSERT INTO print
SELECT myField, newWord, newLength FROM test, LATERAL TABLE(SplitFunction(myField));
{code}
was:
TableFunction can't be work in Flink Sql. Here is my code:
{code:sql}
CREATE TABLE test (
myField STRING,
name STRING
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'xxxx',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'mygroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv'
);
CREATE TABLE print (
myField STRING,
newWord STRING,
newLength INT
) WITH (
'connector' = 'print'
);
CREATE VIEW test_view AS
SELECT myField, newWord, newLength FROM test, LATERAL TABLE(SplitFunction(myField));
INSERT INTO print
SELECT * FROM test_view;
{code}
And the function code as this:
{code:java}
@FunctionHint(output = @DataTypeHint("ROW<newWordSTRING, newLength INT>"))
public class SplitFunction extends TableFunction<Row> {
public void eval(String str) {
for (String s : str.split(" ")) {
collect(Row.of(s, s.length()));
}
}
}
{code}
run the sql,cause an error:
{code}
Unable to find source-code formatter for language: log. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yamlException in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 2, column 17 to line 2, column 23: Column 'newWord' not found in any table
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52)
at com.fcbox.streaming.sql.submit.StreamingJob.callInsertInto(StreamingJob.java:208)
at com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:200)
at com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:129)
at com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:73)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 2, column 17 to line 2, column 23: Column 'newWord' not found in any table
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259)
at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991)
at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6154)
at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6140)
at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321)
at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5574)
at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:452)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4255)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3523)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
... 10 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'newWord' not found in any table
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
... 30 more
{code}
> TableFunction can not work in Flink View
> ----------------------------------------
>
> Key: FLINK-19735
> URL: https://issues.apache.org/jira/browse/FLINK-19735
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.11.1
> Reporter: shizhengchao
> Priority: Major
>
> TableFunction can't be work in Flink Sql. Here is my code:
> {code:sql}
> CREATE TABLE test (
> myField STRING,
> name STRING
> ) WITH (
> 'connector' = 'kafka-0.11',
> 'topic' = 'xxxx',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'properties.group.id' = 'mygroup',
> 'scan.startup.mode' = 'latest-offset',
> 'format' = 'csv'
> );
> CREATE TABLE print (
> myField STRING,
> newWord STRING,
> newLength INT
> ) WITH (
> 'connector' = 'print'
> );
> CREATE VIEW test_view AS
> SELECT myField, newWord, newLength FROM test, LATERAL TABLE(SplitFunction(myField));
> INSERT INTO print
> SELECT * FROM test_view;
> {code}
> And the function code as this:
> {code:java}
> @FunctionHint(output = @DataTypeHint("ROW<newWordSTRING, newLength INT>"))
> public class SplitFunction extends TableFunction<Row> {
> public void eval(String str) {
> for (String s : str.split(" ")) {
> collect(Row.of(s, s.length()));
> }
> }
> }
> {code}
> run the sql,cause an error:
> {code}
> Unable to find source-code formatter for language: log. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yamlException in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 2, column 17 to line 2, column 23: Column 'newWord' not found in any table
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
> at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52)
> at com.fcbox.streaming.sql.submit.StreamingJob.callInsertInto(StreamingJob.java:208)
> at com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:200)
> at com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:129)
> at com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:73)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 2, column 17 to line 2, column 23: Column 'newWord' not found in any table
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
> at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259)
> at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991)
> at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6154)
> at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6140)
> at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5574)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:452)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4255)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3523)
> at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
> ... 10 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'newWord' not found in any table
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
> ... 30 more
> {code}
> But it work effect in "INSERT INTO" statement:
> {code:sql}
> INSERT INTO print
> SELECT myField, newWord, newLength FROM test, LATERAL TABLE(SplitFunction(myField));
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)