You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "hl9902@126.com" <hl...@126.com> on 2020/09/28 01:16:08 UTC

sql-cli执行sql报错

flink版本1.10.2,问题重现如下,请问各位大佬是什么原因:

./sql-client.sh  embedded
Flink SQL> show tables ;
[INFO] Result was empty.

Flink SQL> CREATE TABLE tx ( 
>                     account_id  BIGINT, 
>                     amount      BIGINT, 
>                     transaction_time TIMESTAMP(3), 
>                     WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND 
>                 ) WITH ( 
>                     'connector' = 'kafka', 
>                     'topic'     = 'heli01', 
>                     'properties.bootstrap.servers' = '10.100.51.56:9092', 
>                     'format'    = 'csv' 
>                 );
[INFO] Table has been created.

Flink SQL> select * from tx ;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The following properties are requested:
connector=kafka
format=csv
properties.bootstrap.servers=10.100.51.56:9092
schema.0.data-type=BIGINT
schema.0.name=account_id
schema.1.data-type=BIGINT
schema.1.name=amount
schema.2.data-type=TIMESTAMP(3)
schema.2.name=transaction_time
schema.watermark.0.rowtime=transaction_time
schema.watermark.0.strategy.data-type=TIMESTAMP(3)
schema.watermark.0.strategy.expr=`transaction_time` - INTERVAL '5' SECOND
topic=heli01

The following factories have been considered:
org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory



hl9902@126.com

Re: 回复:sql-cli执行sql报错

Posted by Benchao Li <li...@apache.org>.
kafka的依赖应该是依赖shaded之后的版本,也就是flink-*sql*-connector-kafka***.jar

hl9902@126.com <hl...@126.com> 于2020年9月28日周一 上午10:29写道:

> 确实语法不对。我用了1.10的语法后,执行sql又报了另外一个错误:
> Flink SQL> select * from tx ;
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException:
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
>
> 相关的lib依赖包如下:
> [root@rcx51101 lib]# pwd
> /opt/flink-1.10.2/lib
> [root@rcx51101 lib]# ll | grep kafka
> -rw-rw-r-- 1 test test     26169 Sep 28 10:21
> flink-connector-kafka-0.10_2.11-1.10.2.jar
> -rw-rw-r-- 1 test test     54969 Sep 28 10:21
> flink-connector-kafka-0.11_2.11-1.10.2.jar
> -rw-rw-r-- 1 test test     37642 Sep 28 10:21
> flink-connector-kafka-0.9_2.11-1.10.2.jar
> -rw-rw-r-- 1 test test     81912 Aug 17 16:41
> flink-connector-kafka_2.12-1.10.2.jar
> -rw-rw-r-- 1 test test    106632 Sep 28 10:22
> flink-connector-kafka-base_2.11-1.10.2.jar
> -rw-rw-r-- 1 test test    106632 Aug 17 16:36
> flink-connector-kafka-base_2.12-1.10.2.jar
> -rw-rw-r-- 1 test test   1893564 Jul 24  2018 kafka-clients-2.0.0.jar
>
>
>
> hl9902@126.com
>
> 发件人: 111
> 发送时间: 2020-09-28 09:23
> 收件人: user-zh@flink.apache.org
> 主题: 回复:sql-cli执行sql报错
> 你貌似使用的是flink-1.11的语法。
> 可以修改成flink-1.10的语法试试,参考文档:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>
>
> | |
> xinghalo
> |
> |
> xinghalo@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年09月28日 09:16,hl9902@126.com<hl...@126.com> 写道:
> flink版本1.10.2,问题重现如下,请问各位大佬是什么原因:
>
> ./sql-client.sh  embedded
> Flink SQL> show tables ;
> [INFO] Result was empty.
>
> Flink SQL> CREATE TABLE tx (
> account_id  BIGINT,
> amount      BIGINT,
> transaction_time TIMESTAMP(3),
> WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'topic'     = 'heli01',
> 'properties.bootstrap.servers' = '10.100.51.56:9092',
> 'format'    = 'csv'
> );
> [INFO] Table has been created.
>
> Flink SQL> select * from tx ;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find
> a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
>
> Reason: Required context properties mismatch.
>
> The following properties are requested:
> connector=kafka
> format=csv
> properties.bootstrap.servers=10.100.51.56:9092
> schema.0.data-type=BIGINT
> schema.0.name=account_id
> schema.1.data-type=BIGINT
> schema.1.name=amount
> schema.2.data-type=TIMESTAMP(3)
> schema.2.name=transaction_time
> schema.watermark.0.rowtime=transaction_time
> schema.watermark.0.strategy.data-type=TIMESTAMP(3)
> schema.watermark.0.strategy.expr=`transaction_time` - INTERVAL '5' SECOND
> topic=heli01
>
> The following factories have been considered:
> org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>
>
>
> hl9902@126.com
>


-- 

Best,
Benchao Li

Re: sql-cli执行sql报错

Posted by Benchao Li <li...@apache.org>.
(1) 的方式相当于一个shade之后的包,会把所有compile的依赖都打进去。
(2) 的方式的话,你需要自己手工添加所有这个connector的依赖,比如你提到的kafka-clients。
    而且,kafka-clients本身的依赖如果你没有打到kafka-clients这个包里面的话,那你也需要把
    那些compile依赖也都放进来。所以相当于手工做了一遍maven的依赖处理,而且要想全部都
    放进来,应该会有很多。

如果你对kafka-clients有修改,建议自己重新依赖自己修改后的kafka-clients打包一个kafka-sql-connector-kafka

赵一旦 <hi...@gmail.com> 于2020年9月28日周一 下午5:51写道:

>
> 看了下pom,在flink-sql-connector-kafka中依赖了flink-connector-kafka-**,该包又依赖了flink-connector-kafka-base-**以及kafka-client。
> 然后flink-sql-connector-kafka做了shade。
>
> 所以看下来,我的那个(1)和(2)理论上效果是一样的。
> ————————————————————————————————————————————————————————
>
> 顺便讲下,我kafka-clients更换了ssl证书读取方式,用于支持hdfs等分布式协议(直接复用了flink-core中的filesystem实现)。
>
> 赵一旦 <hi...@gmail.com> 于2020年9月28日周一 下午5:42写道:
>
> >
> 这个不是很懂,(1)flink-connector-kafka_2.11-1.11.2.jar+flink-connector-kafka-base_2.11-1.11.2.jar+kafka-clients-0.11.0.3.jar
> > 和(2)flink-sql-connector-kafka**.jar是啥区别呢?
> >
> > 使用(1)可以不?因为我的kafka-clients部分是调整了源码的。
> >
> > Leonard Xu <xb...@gmail.com> 于2020年9月28日周一 下午4:36写道:
> >
> >> Hi
> >> benchao的回复是的对的,
> >> 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包
> >> flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。
> >>
> >>
> >> > 相关lib包:
> >> > flink-connector-kafka_2.12-1.10.2.jar
> >> > kafka-clients-0.11.0.3.jar
> >>
> >> 祝好
> >> Leonard
> >
> >
>


-- 

Best,
Benchao Li

Re: sql-cli执行sql报错

Posted by 赵一旦 <hi...@gmail.com>.
看了下pom,在flink-sql-connector-kafka中依赖了flink-connector-kafka-**,该包又依赖了flink-connector-kafka-base-**以及kafka-client。
然后flink-sql-connector-kafka做了shade。

所以看下来,我的那个(1)和(2)理论上效果是一样的。
————————————————————————————————————————————————————————
顺便讲下,我kafka-clients更换了ssl证书读取方式,用于支持hdfs等分布式协议(直接复用了flink-core中的filesystem实现)。

赵一旦 <hi...@gmail.com> 于2020年9月28日周一 下午5:42写道:

> 这个不是很懂,(1)flink-connector-kafka_2.11-1.11.2.jar+flink-connector-kafka-base_2.11-1.11.2.jar+kafka-clients-0.11.0.3.jar
> 和(2)flink-sql-connector-kafka**.jar是啥区别呢?
>
> 使用(1)可以不?因为我的kafka-clients部分是调整了源码的。
>
> Leonard Xu <xb...@gmail.com> 于2020年9月28日周一 下午4:36写道:
>
>> Hi
>> benchao的回复是的对的,
>> 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包
>> flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。
>>
>>
>> > 相关lib包:
>> > flink-connector-kafka_2.12-1.10.2.jar
>> > kafka-clients-0.11.0.3.jar
>>
>> 祝好
>> Leonard
>
>

Re: sql-cli执行sql报错

Posted by 赵一旦 <hi...@gmail.com>.
这个不是很懂,(1)flink-connector-kafka_2.11-1.11.2.jar+flink-connector-kafka-base_2.11-1.11.2.jar+kafka-clients-0.11.0.3.jar
和(2)flink-sql-connector-kafka**.jar是啥区别呢?

使用(1)可以不?因为我的kafka-clients部分是调整了源码的。

Leonard Xu <xb...@gmail.com> 于2020年9月28日周一 下午4:36写道:

> Hi
> benchao的回复是的对的,
> 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包
> flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。
>
>
> > 相关lib包:
> > flink-connector-kafka_2.12-1.10.2.jar
> > kafka-clients-0.11.0.3.jar
>
> 祝好
> Leonard

Re: Re: sql-cli执行sql报错

Posted by "hl9902@126.com" <hl...@126.com>.
没有修改kafka,就用官方的jar。后来我用1.11.2版本重新尝试了下,成功了,没有任何错误。
这个问题就不纠结了



hl9902@126.com
 
发件人: Benchao Li
发送时间: 2020-09-29 18:17
收件人: user-zh
主题: Re: Re: sql-cli执行sql报错
这个错误看起来比较奇怪。正常来讲flink-sql-connector-kafka_2.11-1.10.2.jar里面应该都是shaded之后的class了,
但是却报了一个非shaded的ByteArrayDeserializer。
我感觉这个应该是你自己添加了一下比较特殊的逻辑导致的。可以介绍下你对kafka connector做了哪些改造么?
 
hl9902@126.com <hl...@126.com> 于2020年9月28日周一 下午6:06写道:
 
> 按照您的方法重试了下,又报了另一个错误:
> Flink SQL> CREATE TABLE tx (
> >                     account_id  BIGINT,
> >                     amount      BIGINT,
> >                     transaction_time TIMESTAMP(3),
> >                     WATERMARK FOR transaction_time AS transaction_time -
> INTERVAL '5' SECOND
> >                 ) WITH (
> >                     'connector.type' = 'kafka',
> > 'connector.version' = 'universal',
> >                     'connector.topic'     = 'heli01',
> > 'connector.properties.group.id' = 'heli-test',
> >                     'connector.properties.bootstrap.servers' = '
> 10.100.51.56:9092',
> > 'connector.startup-mode' = 'earliest-offset',
> >                     'format.type'    = 'csv'
> >                 );
> [INFO] Table has been created.
>
> Flink SQL> show tables ;
> tx
>
> Flink SQL> select * from tx ;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.serialization.ByteArrayDeserializer is not an
> instance of
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer
>
> 附:lib包清单
> [test@rcx51101 lib]$ pwd
> /opt/flink-1.10.2/lib
>
> flink-csv-1.10.2.jar
> flink-dist_2.12-1.10.2.jar
> flink-jdbc_2.12-1.10.2.jar
> flink-json-1.10.2.jar
> flink-shaded-hadoop-2-uber-2.6.5-10.0.jar
> flink-sql-connector-kafka_2.11-1.10.2.jar
> flink-table_2.12-1.10.2.jar
> flink-table-blink_2.12-1.10.2.jar
> log4j-1.2.17.jar
> mysql-connector-java-5.1.48.jar
> slf4j-log4j12-1.7.15.jar
>
>
>
>
> hl9902@126.com
>
> 发件人: Leonard Xu
> 发送时间: 2020-09-28 16:36
> 收件人: user-zh
> 主题: Re: sql-cli执行sql报错
> Hi
> benchao的回复是的对的,
> 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包
> flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。
>
>
> > 相关lib包:
> > flink-connector-kafka_2.12-1.10.2.jar
> > kafka-clients-0.11.0.3.jar
>
> 祝好
> Leonard
>
 
 
-- 
 
Best,
Benchao Li

Re: Re: sql-cli执行sql报错

Posted by Benchao Li <li...@apache.org>.
这个错误看起来比较奇怪。正常来讲flink-sql-connector-kafka_2.11-1.10.2.jar里面应该都是shaded之后的class了,
但是却报了一个非shaded的ByteArrayDeserializer。
我感觉这个应该是你自己添加了一下比较特殊的逻辑导致的。可以介绍下你对kafka connector做了哪些改造么?

hl9902@126.com <hl...@126.com> 于2020年9月28日周一 下午6:06写道:

> 按照您的方法重试了下,又报了另一个错误:
> Flink SQL> CREATE TABLE tx (
> >                     account_id  BIGINT,
> >                     amount      BIGINT,
> >                     transaction_time TIMESTAMP(3),
> >                     WATERMARK FOR transaction_time AS transaction_time -
> INTERVAL '5' SECOND
> >                 ) WITH (
> >                     'connector.type' = 'kafka',
> > 'connector.version' = 'universal',
> >                     'connector.topic'     = 'heli01',
> > 'connector.properties.group.id' = 'heli-test',
> >                     'connector.properties.bootstrap.servers' = '
> 10.100.51.56:9092',
> > 'connector.startup-mode' = 'earliest-offset',
> >                     'format.type'    = 'csv'
> >                 );
> [INFO] Table has been created.
>
> Flink SQL> show tables ;
> tx
>
> Flink SQL> select * from tx ;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.serialization.ByteArrayDeserializer is not an
> instance of
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer
>
> 附:lib包清单
> [test@rcx51101 lib]$ pwd
> /opt/flink-1.10.2/lib
>
> flink-csv-1.10.2.jar
> flink-dist_2.12-1.10.2.jar
> flink-jdbc_2.12-1.10.2.jar
> flink-json-1.10.2.jar
> flink-shaded-hadoop-2-uber-2.6.5-10.0.jar
> flink-sql-connector-kafka_2.11-1.10.2.jar
> flink-table_2.12-1.10.2.jar
> flink-table-blink_2.12-1.10.2.jar
> log4j-1.2.17.jar
> mysql-connector-java-5.1.48.jar
> slf4j-log4j12-1.7.15.jar
>
>
>
>
> hl9902@126.com
>
> 发件人: Leonard Xu
> 发送时间: 2020-09-28 16:36
> 收件人: user-zh
> 主题: Re: sql-cli执行sql报错
> Hi
> benchao的回复是的对的,
> 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包
> flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。
>
>
> > 相关lib包:
> > flink-connector-kafka_2.12-1.10.2.jar
> > kafka-clients-0.11.0.3.jar
>
> 祝好
> Leonard
>


-- 

Best,
Benchao Li

Re: Re: sql-cli执行sql报错

Posted by zhisheng <zh...@gmail.com>.
这个问题同样在最新的 master 分支也有这个问题,我建了一个 Issue 描述了下整个流程
https://issues.apache.org/jira/browse/FLINK-19995

hl9902@126.com <hl...@126.com> 于2020年9月28日周一 下午6:06写道:

> 按照您的方法重试了下,又报了另一个错误:
> Flink SQL> CREATE TABLE tx (
> >                     account_id  BIGINT,
> >                     amount      BIGINT,
> >                     transaction_time TIMESTAMP(3),
> >                     WATERMARK FOR transaction_time AS transaction_time -
> INTERVAL '5' SECOND
> >                 ) WITH (
> >                     'connector.type' = 'kafka',
> > 'connector.version' = 'universal',
> >                     'connector.topic'     = 'heli01',
> > 'connector.properties.group.id' = 'heli-test',
> >                     'connector.properties.bootstrap.servers' = '
> 10.100.51.56:9092',
> > 'connector.startup-mode' = 'earliest-offset',
> >                     'format.type'    = 'csv'
> >                 );
> [INFO] Table has been created.
>
> Flink SQL> show tables ;
> tx
>
> Flink SQL> select * from tx ;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.serialization.ByteArrayDeserializer is not an
> instance of
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer
>
> 附:lib包清单
> [test@rcx51101 lib]$ pwd
> /opt/flink-1.10.2/lib
>
> flink-csv-1.10.2.jar
> flink-dist_2.12-1.10.2.jar
> flink-jdbc_2.12-1.10.2.jar
> flink-json-1.10.2.jar
> flink-shaded-hadoop-2-uber-2.6.5-10.0.jar
> flink-sql-connector-kafka_2.11-1.10.2.jar
> flink-table_2.12-1.10.2.jar
> flink-table-blink_2.12-1.10.2.jar
> log4j-1.2.17.jar
> mysql-connector-java-5.1.48.jar
> slf4j-log4j12-1.7.15.jar
>
>
>
>
> hl9902@126.com
>
> 发件人: Leonard Xu
> 发送时间: 2020-09-28 16:36
> 收件人: user-zh
> 主题: Re: sql-cli执行sql报错
> Hi
> benchao的回复是的对的,
> 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包
> flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。
>
>
> > 相关lib包:
> > flink-connector-kafka_2.12-1.10.2.jar
> > kafka-clients-0.11.0.3.jar
>
> 祝好
> Leonard
>

回复: Re: sql-cli执行sql报错

Posted by "hl9902@126.com" <hl...@126.com>.
按照您的方法重试了下,又报了另一个错误:
Flink SQL> CREATE TABLE tx ( 
>                     account_id  BIGINT, 
>                     amount      BIGINT, 
>                     transaction_time TIMESTAMP(3), 
>                     WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND 
>                 ) WITH ( 
>                     'connector.type' = 'kafka', 
> 'connector.version' = 'universal',
>                     'connector.topic'     = 'heli01', 
> 'connector.properties.group.id' = 'heli-test',
>                     'connector.properties.bootstrap.servers' = '10.100.51.56:9092', 
> 'connector.startup-mode' = 'earliest-offset',
>                     'format.type'    = 'csv' 
>                 );
[INFO] Table has been created.

Flink SQL> show tables ;
tx

Flink SQL> select * from tx ;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer

附:lib包清单
[test@rcx51101 lib]$ pwd
/opt/flink-1.10.2/lib

flink-csv-1.10.2.jar
flink-dist_2.12-1.10.2.jar
flink-jdbc_2.12-1.10.2.jar
flink-json-1.10.2.jar
flink-shaded-hadoop-2-uber-2.6.5-10.0.jar
flink-sql-connector-kafka_2.11-1.10.2.jar
flink-table_2.12-1.10.2.jar
flink-table-blink_2.12-1.10.2.jar
log4j-1.2.17.jar
mysql-connector-java-5.1.48.jar
slf4j-log4j12-1.7.15.jar




hl9902@126.com
 
发件人: Leonard Xu
发送时间: 2020-09-28 16:36
收件人: user-zh
主题: Re: sql-cli执行sql报错
Hi
benchao的回复是的对的,
你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包 flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。
 
 
> 相关lib包:
> flink-connector-kafka_2.12-1.10.2.jar
> kafka-clients-0.11.0.3.jar  
 
祝好
Leonard 

Re: sql-cli执行sql报错

Posted by Leonard Xu <xb...@gmail.com>.
Hi
benchao的回复是的对的,
你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包 flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。


> 相关lib包:
> flink-connector-kafka_2.12-1.10.2.jar
> kafka-clients-0.11.0.3.jar  

祝好
Leonard 

Re: 回复: sql-cli执行sql报错

Posted by "hl9902@126.com" <hl...@126.com>.
我按照下面的步骤尝试了下,依然报同样的错误:
错误信息:java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

相关lib包:
flink-connector-kafka_2.12-1.10.2.jar
kafka-clients-0.11.0.3.jar (之前是kafka-clients-2.0.0.jar)



hl9902@126.com
 
发件人: 111
发送时间: 2020-09-28 10:41
收件人: user-zh@flink.apache.org
抄送: user-zh
主题: 回复: sql-cli执行sql报错
HI,
 
 
大致看了下,建议可以这么排查:
1 flink-connector-kafka* 包太多了,仅保留你需要的版本即可
2 kafka-clients*包版本太高了,你看看你需要的版本是哪个
 
 
Best, xingoo

回复: sql-cli执行sql报错

Posted by 111 <xi...@163.com>.
HI,


大致看了下,建议可以这么排查:
1 flink-connector-kafka* 包太多了,仅保留你需要的版本即可
2 kafka-clients*包版本太高了,你看看你需要的版本是哪个


Best, xingoo

Re: 回复:sql-cli执行sql报错

Posted by "hl9902@126.com" <hl...@126.com>.
确实语法不对。我用了1.10的语法后,执行sql又报了另外一个错误:
Flink SQL> select * from tx ;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

相关的lib依赖包如下:
[root@rcx51101 lib]# pwd
/opt/flink-1.10.2/lib
[root@rcx51101 lib]# ll | grep kafka
-rw-rw-r-- 1 test test     26169 Sep 28 10:21 flink-connector-kafka-0.10_2.11-1.10.2.jar
-rw-rw-r-- 1 test test     54969 Sep 28 10:21 flink-connector-kafka-0.11_2.11-1.10.2.jar
-rw-rw-r-- 1 test test     37642 Sep 28 10:21 flink-connector-kafka-0.9_2.11-1.10.2.jar
-rw-rw-r-- 1 test test     81912 Aug 17 16:41 flink-connector-kafka_2.12-1.10.2.jar
-rw-rw-r-- 1 test test    106632 Sep 28 10:22 flink-connector-kafka-base_2.11-1.10.2.jar
-rw-rw-r-- 1 test test    106632 Aug 17 16:36 flink-connector-kafka-base_2.12-1.10.2.jar
-rw-rw-r-- 1 test test   1893564 Jul 24  2018 kafka-clients-2.0.0.jar



hl9902@126.com
 
发件人: 111
发送时间: 2020-09-28 09:23
收件人: user-zh@flink.apache.org
主题: 回复:sql-cli执行sql报错
你貌似使用的是flink-1.11的语法。
可以修改成flink-1.10的语法试试,参考文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
 
 
| |
xinghalo
|
|
xinghalo@163.com
|
签名由网易邮箱大师定制
 
 
在2020年09月28日 09:16,hl9902@126.com<hl...@126.com> 写道:
flink版本1.10.2,问题重现如下,请问各位大佬是什么原因:
 
./sql-client.sh  embedded
Flink SQL> show tables ;
[INFO] Result was empty.
 
Flink SQL> CREATE TABLE tx (
account_id  BIGINT,
amount      BIGINT,
transaction_time TIMESTAMP(3),
WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic'     = 'heli01',
'properties.bootstrap.servers' = '10.100.51.56:9092',
'format'    = 'csv'
);
[INFO] Table has been created.
 
Flink SQL> select * from tx ;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
 
Reason: Required context properties mismatch.
 
The following properties are requested:
connector=kafka
format=csv
properties.bootstrap.servers=10.100.51.56:9092
schema.0.data-type=BIGINT
schema.0.name=account_id
schema.1.data-type=BIGINT
schema.1.name=amount
schema.2.data-type=TIMESTAMP(3)
schema.2.name=transaction_time
schema.watermark.0.rowtime=transaction_time
schema.watermark.0.strategy.data-type=TIMESTAMP(3)
schema.watermark.0.strategy.expr=`transaction_time` - INTERVAL '5' SECOND
topic=heli01
 
The following factories have been considered:
org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
 
 
 
hl9902@126.com

回复:sql-cli执行sql报错

Posted by 111 <xi...@163.com>.
你貌似使用的是flink-1.11的语法。
可以修改成flink-1.10的语法试试,参考文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector


| |
xinghalo
|
|
xinghalo@163.com
|
签名由网易邮箱大师定制


在2020年09月28日 09:16,hl9902@126.com<hl...@126.com> 写道:
flink版本1.10.2,问题重现如下,请问各位大佬是什么原因:

./sql-client.sh  embedded
Flink SQL> show tables ;
[INFO] Result was empty.

Flink SQL> CREATE TABLE tx (
account_id  BIGINT,
amount      BIGINT,
transaction_time TIMESTAMP(3),
WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic'     = 'heli01',
'properties.bootstrap.servers' = '10.100.51.56:9092',
'format'    = 'csv'
);
[INFO] Table has been created.

Flink SQL> select * from tx ;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The following properties are requested:
connector=kafka
format=csv
properties.bootstrap.servers=10.100.51.56:9092
schema.0.data-type=BIGINT
schema.0.name=account_id
schema.1.data-type=BIGINT
schema.1.name=amount
schema.2.data-type=TIMESTAMP(3)
schema.2.name=transaction_time
schema.watermark.0.rowtime=transaction_time
schema.watermark.0.strategy.data-type=TIMESTAMP(3)
schema.watermark.0.strategy.expr=`transaction_time` - INTERVAL '5' SECOND
topic=heli01

The following factories have been considered:
org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory



hl9902@126.com