You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by admin <17...@163.com> on 2020/07/14 02:50:39 UTC

自定义的sql connector在sql-cli中运行问题

hi all,
我自定义了一个sql connector,在本地idea里面是调试通过的,数据能正常写入,但是整个flink编译之后,用编译后的包在本地起了standalone集群,在sql-cli中运行报错如下
2020-07-14 10:36:29,148 WARN  org.apache.flink.table.client.cli.CliClient                  [] - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL update statement.
   at org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:698) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:551) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_251]
   at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
Caused by: scala.MatchError: null
   at org.apache.flink.table.planner.sinks.TableSinkUtils$.inferSinkPhysicalSchema(TableSinkUtils.scala:165) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:305) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.Option.map(Option.scala:146) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:767) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:571) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.sqlUpdate(StreamTableEnvironmentImpl.java:341) ~[flink-table-api-java-bridge_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$applyUpdate$17(LocalExecutor.java:691) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:246) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:689) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   ... 9 more
flink版本1.10.1 blink planner
测试的sql为:
Flink SQL> CREATE TABLE prometheus_table (value DOUBLE )WITH ('connector.type' = 'prometheus','connector.job' = 'testJob','connector.metrics' = 'testMetrics','connector.address' = 'localhost:9091');
Flink SQL> insert into prometheus_table select cast(100.01 as double) as value;
看报错的地方应该是
def inferSinkPhysicalSchema(
    queryLogicalType: RowType,
    sink: TableSink[_]): TableSchema = {
  val withChangeFlag = sink match {
    case _: RetractStreamTableSink[_] | _: UpsertStreamTableSink[_] => true
    case _: StreamTableSink[_] => false
    case dsts: DataStreamTableSink[_] => dsts.withChangeFlag
  }
  inferSinkPhysicalSchema(sink.getConsumedDataType, queryLogicalType, withChangeFlag)
}
sink没有match到,但是我的tablesink是实现了AppendStreamTableSink的
想远程debug调试一下,按照网上的方法[1]也没成功

大佬们有没有什么思路指导一下。感谢

[1]https://blog.csdn.net/xianzhen376/article/details/80117637 <https://blog.csdn.net/xianzhen376/article/details/80117637>

Re: 自定义的sql connector在sql-cli中运行问题

Posted by admin <17...@163.com>.
解决了,原因是我同时实现了createTableSink和createStreamTableSink导致
删掉createTableSink就可以了


> 2020年7月14日 上午10:50,admin <17...@163.com> 写道:
> 
> hi all,
> 我自定义了一个sql connector,在本地idea里面是调试通过的,数据能正常写入,但是整个flink编译之后,用编译后的包在本地起了standalone集群,在sql-cli中运行报错如下
> 2020-07-14 10:36:29,148 WARN  org.apache.flink.table.client.cli.CliClient                  [] - Could not execute SQL statement.
> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL update statement.
>    at org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:698) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:551) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_251]
>    at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> Caused by: scala.MatchError: null
>    at org.apache.flink.table.planner.sinks.TableSinkUtils$.inferSinkPhysicalSchema(TableSinkUtils.scala:165) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:305) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at scala.Option.map(Option.scala:146) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:767) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:571) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.sqlUpdate(StreamTableEnvironmentImpl.java:341) ~[flink-table-api-java-bridge_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$applyUpdate$17(LocalExecutor.java:691) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:246) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    at org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:689) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>    ... 9 more
> flink版本1.10.1 blink planner
> 测试的sql为:
> Flink SQL> CREATE TABLE prometheus_table (value DOUBLE )WITH ('connector.type' = 'prometheus','connector.job' = 'testJob','connector.metrics' = 'testMetrics','connector.address' = 'localhost:9091');
> Flink SQL> insert into prometheus_table select cast(100.01 as double) as value;
> 看报错的地方应该是
> def inferSinkPhysicalSchema(
>     queryLogicalType: RowType,
>     sink: TableSink[_]): TableSchema = {
>   val withChangeFlag = sink match {
>     case _: RetractStreamTableSink[_] | _: UpsertStreamTableSink[_] => true
>     case _: StreamTableSink[_] => false
>     case dsts: DataStreamTableSink[_] => dsts.withChangeFlag
>   }
>   inferSinkPhysicalSchema(sink.getConsumedDataType, queryLogicalType, withChangeFlag)
> }
> sink没有match到,但是我的tablesink是实现了AppendStreamTableSink的
> 想远程debug调试一下,按照网上的方法[1]也没成功
> 
> 大佬们有没有什么思路指导一下。感谢
> 
> [1]https://blog.csdn.net/xianzhen376/article/details/80117637 <https://blog.csdn.net/xianzhen376/article/details/80117637>