You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dawid Wysakowicz (Jira)" <ji...@apache.org> on 2020/08/11 07:39:00 UTC

[jira] [Commented] (FLINK-18889) New Async Table Function type inference fails

    [ https://issues.apache.org/jira/browse/FLINK-18889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17175309#comment-17175309 ] 

Dawid Wysakowicz commented on FLINK-18889:
------------------------------------------

This might be a duplicate of FLINK-18890 cc [~twalthr]

> New Async Table Function type inference fails
> ---------------------------------------------
>
>                 Key: FLINK-18889
>                 URL: https://issues.apache.org/jira/browse/FLINK-18889
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.11.1
>            Reporter: Mulan
>            Priority: Major
>
> {code:java}
> @FunctionHint(
>     input = @DataTypeHint("STRING"),
>     output = @DataTypeHint("ROW<ct STRING>")
> )
> public class RedisAsyncTableFunction extends AsyncTableFunction<Row> {
>     private RedisClient redisClient;
>     private StatefulRedisConnection<String, String> connection;
>     private RedisKeyAsyncCommands<String, String> async;
>     private static final String PREFIX = "redis://";
>     private static final String DEFAULT_DB = "0";
>     private static final String DEFAULT_URL = "localhost:6379";
>     private static final String DEFAULT_PASSWORD = "";
>     @Override
>     public void open(FunctionContext context) throws Exception {
>         final String url = DEFAULT_URL;
>         final String password = DEFAULT_PASSWORD;
>         final String database = DEFAULT_DB;
>         StringBuilder redisUri = new StringBuilder();
>         redisUri.append(PREFIX).append(password).append(url).append("/").append(database);
>         redisClient = RedisClient.create(redisUri.toString());
>         connection = redisClient.connect();
>         async = connection.async();
>     }
>     public void eval(CompletableFuture<Collection<Row>> outputFuture, String key) {
>         RedisFuture<Map<String, String>> redisFuture = ((RedisHashAsyncCommands) async).hgetall(key);
>         redisFuture.thenAccept(new Consumer<Map<String, String>>() {
>             @Override
>             public void accept(Map<String, String> values) {
>                 int len = 1;
>                 Row row = new Row(len);
>                 row.setField(0, values.get("ct"));
>                 outputFuture.complete(Collections.singletonList(row));
>             }
>         });
>     }
>     @Override
>     public void close() throws Exception {
>         if (connection != null){
>             connection.close();
>         }
>         if (redisClient != null){
>             redisClient.shutdown();
>         }
>     }
> }
> {code}
> {code:java}
> tEnv.createTemporarySystemFunction("lookup_redis", RedisAsyncTableFunction.class);
> {code}
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 3, column 31 to line 3, column 48: No match found for function signature lookup_redis(<CHARACTER>)
> 	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> 	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
> 	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
> 	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
> 	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
> 	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> 	at hua.mulan.slink.SqlSubmit.callInsertInto(SqlSubmit.java:100)
> 	at hua.mulan.slink.SqlSubmit.callCommand(SqlSubmit.java:75)
> 	at hua.mulan.slink.SqlSubmit.run(SqlSubmit.java:57)
> 	at hua.mulan.slink.SqlSubmit.main(SqlSubmit.java:38)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 3, column 31 to line 3, column 48: No match found for function signature lookup_redis(<CHARACTER>)
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> 	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> 	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> 	at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> 	at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
> 	at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> 	at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
> 	at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
> 	at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
> 	at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
> 	at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
> 	at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
> 	at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> 	at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
> 	at org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57)
> 	at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> 	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> 	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> 	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
> 	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
> 	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
> 	at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
> 	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
> 	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
> 	at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> 	at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> 	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> 	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> 	at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
> 	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
> 	at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
> 	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
> 	... 10 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature lookup_redis(<CHARACTER>)
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> 	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> 	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> 	at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> 	at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
> 	... 38 more
> {code}
> Is similar issue? 
> https://issues.apache.org/jira/browse/FLINK-18520



--
This message was sent by Atlassian Jira
(v8.3.4#803005)