You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Carmen Free <dd...@gmail.com> on 2021/01/05 09:09:23 UTC

flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的

flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的

1、版本说明
flink版本:1.10.2
kafka版本:1.1.0

2、kafka鉴权说明
仅使用了sasl鉴权方式
在kafka客户端有配置 kafka_server-jass.conf、
server.properties、producer.properties、consumer.properties

3、主要配置参数
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.comon.security.plain.PlainLoginModule
required username="xx" password="xx-secret";
当前配置方式,通过java程序来消费鉴权后的kafka,可以正常消费。

4、用于flink SQL连接的jar包
flink-sql-connector-kafka_2.11-1.10.2.jar
flink-jdbc_2.11-1.10.2.jar
flink-csv-1.10.2-sql-jar.jar


5、我的思路
类比java程序中添加kafka认证的相关参数(3中的参数,添加完毕即可),我直接在flink SQL中建立kafka
table时,就指定了相关的3中所说明的参数。但是用flink SQL客户端来实现消费kafka,则会报错。

6、启动客户端
./bin/sql-client.sh embedded -l sql_lib/
其中sql_lib文件夹放的是4中提及的flink-connector相关的jar包


7、建表语句:
create table test_hello (
name string
) with (
...
...
'connector.properties.sasl.mechanism' = 'PLAIN',
'connector.properties.security.protocol' = 'SASL_PLAINTEXT',
'connector.properties.sasl.jaas.config' =
'org.apache.kafka.comon.security.plain.PlainLoginModule required
username="xx" password="xx-secret";',
'format.type' = 'csv'
);

建表没有问题,可以正常建表。

查询表的时候,就会报错,select * from test_hello;
报错如下:
could not execute sql statement. Reason:
javax.security.auth.login.loginException: unable to find loginModule class:
org.apache.kafka.common.security.plain.PlainLoginModule
但是这个类我在flink-sql-connector-kafka_2.11-1.10.2.jar包中是可以发现的,所以不知道什么原因?

kafka添加鉴权后,我使用flink SQL的方式不对吗?希望有知道能帮忙解答一下,谢谢。

Re: flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的

Posted by Carmen Free <dd...@gmail.com>.
好的,非常感谢。

赵一旦 <hi...@gmail.com> 于2021年1月6日周三 下午1:08写道:

> 这个的话去看看KafkaConnector相关的参数,比较新的版本支持配置解析错误忽略。
>
> Carmen Free <dd...@gmail.com> 于2021年1月6日周三 上午10:58写道:
>
> > 感谢帮忙解决问题,确实包的路径有问题,换成这个包就解决了这个问题。
> >
> > 紧接着我这边出现了新的异常
> >
> >
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
> > No content to map due to end-of-input at [Source:UNKONWN; line: -1,
> column:
> > -1;]
> >
> > 这个问题的原因,主要是由于kafka消息为空导致的,只要kafka消息不为空,就可以正常消费。
> >
> > 但是如果遇到了kafka消息为空的情况,这边不能处理吗?
> >
> > 赵一旦 <hi...@gmail.com> 于2021年1月5日周二 下午9:18写道:
> >
> > > 我感觉还是jar的问题。如下尝试下,我懒得去试了。
> > > 将 org.apache.kafka.common.security.plain.PlainLoginModule 替换为
> > > org.apache.flink.kafka.shaded.org.apache.kafka.common.securi
> > > ty.plain.PlainLoginModule
> > >
> > > 因为你用的是sql-connector-kafka,这个包把kafka-clients的包shade了。
> > >
> > > Carmen Free <dd...@gmail.com> 于2021年1月5日周二 下午5:09写道:
> > >
> > > > flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的
> > > >
> > > > 1、版本说明
> > > > flink版本:1.10.2
> > > > kafka版本:1.1.0
> > > >
> > > > 2、kafka鉴权说明
> > > > 仅使用了sasl鉴权方式
> > > > 在kafka客户端有配置 kafka_server-jass.conf、
> > > > server.properties、producer.properties、consumer.properties
> > > >
> > > > 3、主要配置参数
> > > > sasl.mechanism=PLAIN
> > > > security.protocol=SASL_PLAINTEXT
> > > >
> sasl.jaas.config=org.apache.kafka.comon.security.plain.PlainLoginModule
> > > > required username="xx" password="xx-secret";
> > > > 当前配置方式,通过java程序来消费鉴权后的kafka,可以正常消费。
> > > >
> > > > 4、用于flink SQL连接的jar包
> > > > flink-sql-connector-kafka_2.11-1.10.2.jar
> > > > flink-jdbc_2.11-1.10.2.jar
> > > > flink-csv-1.10.2-sql-jar.jar
> > > >
> > > >
> > > > 5、我的思路
> > > > 类比java程序中添加kafka认证的相关参数(3中的参数,添加完毕即可),我直接在flink SQL中建立kafka
> > > > table时,就指定了相关的3中所说明的参数。但是用flink SQL客户端来实现消费kafka,则会报错。
> > > >
> > > > 6、启动客户端
> > > > ./bin/sql-client.sh embedded -l sql_lib/
> > > > 其中sql_lib文件夹放的是4中提及的flink-connector相关的jar包
> > > >
> > > >
> > > > 7、建表语句:
> > > > create table test_hello (
> > > > name string
> > > > ) with (
> > > > ...
> > > > ...
> > > > 'connector.properties.sasl.mechanism' = 'PLAIN',
> > > > 'connector.properties.security.protocol' = 'SASL_PLAINTEXT',
> > > > 'connector.properties.sasl.jaas.config' =
> > > > 'org.apache.kafka.comon.security.plain.PlainLoginModule required
> > > > username="xx" password="xx-secret";',
> > > > 'format.type' = 'csv'
> > > > );
> > > >
> > > > 建表没有问题,可以正常建表。
> > > >
> > > > 查询表的时候,就会报错,select * from test_hello;
> > > > 报错如下:
> > > > could not execute sql statement. Reason:
> > > > javax.security.auth.login.loginException: unable to find loginModule
> > > class:
> > > > org.apache.kafka.common.security.plain.PlainLoginModule
> > > > 但是这个类我在flink-sql-connector-kafka_2.11-1.10.2.jar包中是可以发现的,所以不知道什么原因?
> > > >
> > > > kafka添加鉴权后,我使用flink SQL的方式不对吗?希望有知道能帮忙解答一下,谢谢。
> > > >
> > >
> >
>

Re: flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的

Posted by 赵一旦 <hi...@gmail.com>.
这个的话去看看KafkaConnector相关的参数,比较新的版本支持配置解析错误忽略。

Carmen Free <dd...@gmail.com> 于2021年1月6日周三 上午10:58写道:

> 感谢帮忙解决问题,确实包的路径有问题,换成这个包就解决了这个问题。
>
> 紧接着我这边出现了新的异常
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
> No content to map due to end-of-input at [Source:UNKONWN; line: -1, column:
> -1;]
>
> 这个问题的原因,主要是由于kafka消息为空导致的,只要kafka消息不为空,就可以正常消费。
>
> 但是如果遇到了kafka消息为空的情况,这边不能处理吗?
>
> 赵一旦 <hi...@gmail.com> 于2021年1月5日周二 下午9:18写道:
>
> > 我感觉还是jar的问题。如下尝试下,我懒得去试了。
> > 将 org.apache.kafka.common.security.plain.PlainLoginModule 替换为
> > org.apache.flink.kafka.shaded.org.apache.kafka.common.securi
> > ty.plain.PlainLoginModule
> >
> > 因为你用的是sql-connector-kafka,这个包把kafka-clients的包shade了。
> >
> > Carmen Free <dd...@gmail.com> 于2021年1月5日周二 下午5:09写道:
> >
> > > flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的
> > >
> > > 1、版本说明
> > > flink版本:1.10.2
> > > kafka版本:1.1.0
> > >
> > > 2、kafka鉴权说明
> > > 仅使用了sasl鉴权方式
> > > 在kafka客户端有配置 kafka_server-jass.conf、
> > > server.properties、producer.properties、consumer.properties
> > >
> > > 3、主要配置参数
> > > sasl.mechanism=PLAIN
> > > security.protocol=SASL_PLAINTEXT
> > > sasl.jaas.config=org.apache.kafka.comon.security.plain.PlainLoginModule
> > > required username="xx" password="xx-secret";
> > > 当前配置方式,通过java程序来消费鉴权后的kafka,可以正常消费。
> > >
> > > 4、用于flink SQL连接的jar包
> > > flink-sql-connector-kafka_2.11-1.10.2.jar
> > > flink-jdbc_2.11-1.10.2.jar
> > > flink-csv-1.10.2-sql-jar.jar
> > >
> > >
> > > 5、我的思路
> > > 类比java程序中添加kafka认证的相关参数(3中的参数,添加完毕即可),我直接在flink SQL中建立kafka
> > > table时,就指定了相关的3中所说明的参数。但是用flink SQL客户端来实现消费kafka,则会报错。
> > >
> > > 6、启动客户端
> > > ./bin/sql-client.sh embedded -l sql_lib/
> > > 其中sql_lib文件夹放的是4中提及的flink-connector相关的jar包
> > >
> > >
> > > 7、建表语句:
> > > create table test_hello (
> > > name string
> > > ) with (
> > > ...
> > > ...
> > > 'connector.properties.sasl.mechanism' = 'PLAIN',
> > > 'connector.properties.security.protocol' = 'SASL_PLAINTEXT',
> > > 'connector.properties.sasl.jaas.config' =
> > > 'org.apache.kafka.comon.security.plain.PlainLoginModule required
> > > username="xx" password="xx-secret";',
> > > 'format.type' = 'csv'
> > > );
> > >
> > > 建表没有问题,可以正常建表。
> > >
> > > 查询表的时候,就会报错,select * from test_hello;
> > > 报错如下:
> > > could not execute sql statement. Reason:
> > > javax.security.auth.login.loginException: unable to find loginModule
> > class:
> > > org.apache.kafka.common.security.plain.PlainLoginModule
> > > 但是这个类我在flink-sql-connector-kafka_2.11-1.10.2.jar包中是可以发现的,所以不知道什么原因?
> > >
> > > kafka添加鉴权后,我使用flink SQL的方式不对吗?希望有知道能帮忙解答一下,谢谢。
> > >
> >
>

Re: flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的

Posted by Carmen Free <dd...@gmail.com>.
感谢帮忙解决问题,确实包的路径有问题,换成这个包就解决了这个问题。

紧接着我这边出现了新的异常
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
No content to map due to end-of-input at [Source:UNKONWN; line: -1, column:
-1;]

这个问题的原因,主要是由于kafka消息为空导致的,只要kafka消息不为空,就可以正常消费。

但是如果遇到了kafka消息为空的情况,这边不能处理吗?

赵一旦 <hi...@gmail.com> 于2021年1月5日周二 下午9:18写道:

> 我感觉还是jar的问题。如下尝试下,我懒得去试了。
> 将 org.apache.kafka.common.security.plain.PlainLoginModule 替换为
> org.apache.flink.kafka.shaded.org.apache.kafka.common.securi
> ty.plain.PlainLoginModule
>
> 因为你用的是sql-connector-kafka,这个包把kafka-clients的包shade了。
>
> Carmen Free <dd...@gmail.com> 于2021年1月5日周二 下午5:09写道:
>
> > flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的
> >
> > 1、版本说明
> > flink版本:1.10.2
> > kafka版本:1.1.0
> >
> > 2、kafka鉴权说明
> > 仅使用了sasl鉴权方式
> > 在kafka客户端有配置 kafka_server-jass.conf、
> > server.properties、producer.properties、consumer.properties
> >
> > 3、主要配置参数
> > sasl.mechanism=PLAIN
> > security.protocol=SASL_PLAINTEXT
> > sasl.jaas.config=org.apache.kafka.comon.security.plain.PlainLoginModule
> > required username="xx" password="xx-secret";
> > 当前配置方式,通过java程序来消费鉴权后的kafka,可以正常消费。
> >
> > 4、用于flink SQL连接的jar包
> > flink-sql-connector-kafka_2.11-1.10.2.jar
> > flink-jdbc_2.11-1.10.2.jar
> > flink-csv-1.10.2-sql-jar.jar
> >
> >
> > 5、我的思路
> > 类比java程序中添加kafka认证的相关参数(3中的参数,添加完毕即可),我直接在flink SQL中建立kafka
> > table时,就指定了相关的3中所说明的参数。但是用flink SQL客户端来实现消费kafka,则会报错。
> >
> > 6、启动客户端
> > ./bin/sql-client.sh embedded -l sql_lib/
> > 其中sql_lib文件夹放的是4中提及的flink-connector相关的jar包
> >
> >
> > 7、建表语句:
> > create table test_hello (
> > name string
> > ) with (
> > ...
> > ...
> > 'connector.properties.sasl.mechanism' = 'PLAIN',
> > 'connector.properties.security.protocol' = 'SASL_PLAINTEXT',
> > 'connector.properties.sasl.jaas.config' =
> > 'org.apache.kafka.comon.security.plain.PlainLoginModule required
> > username="xx" password="xx-secret";',
> > 'format.type' = 'csv'
> > );
> >
> > 建表没有问题,可以正常建表。
> >
> > 查询表的时候,就会报错,select * from test_hello;
> > 报错如下:
> > could not execute sql statement. Reason:
> > javax.security.auth.login.loginException: unable to find loginModule
> class:
> > org.apache.kafka.common.security.plain.PlainLoginModule
> > 但是这个类我在flink-sql-connector-kafka_2.11-1.10.2.jar包中是可以发现的,所以不知道什么原因?
> >
> > kafka添加鉴权后,我使用flink SQL的方式不对吗?希望有知道能帮忙解答一下,谢谢。
> >
>

Re: flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的

Posted by 赵一旦 <hi...@gmail.com>.
我感觉还是jar的问题。如下尝试下,我懒得去试了。
将 org.apache.kafka.common.security.plain.PlainLoginModule 替换为
org.apache.flink.kafka.shaded.org.apache.kafka.common.securi
ty.plain.PlainLoginModule

因为你用的是sql-connector-kafka,这个包把kafka-clients的包shade了。

Carmen Free <dd...@gmail.com> 于2021年1月5日周二 下午5:09写道:

> flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的
>
> 1、版本说明
> flink版本:1.10.2
> kafka版本:1.1.0
>
> 2、kafka鉴权说明
> 仅使用了sasl鉴权方式
> 在kafka客户端有配置 kafka_server-jass.conf、
> server.properties、producer.properties、consumer.properties
>
> 3、主要配置参数
> sasl.mechanism=PLAIN
> security.protocol=SASL_PLAINTEXT
> sasl.jaas.config=org.apache.kafka.comon.security.plain.PlainLoginModule
> required username="xx" password="xx-secret";
> 当前配置方式,通过java程序来消费鉴权后的kafka,可以正常消费。
>
> 4、用于flink SQL连接的jar包
> flink-sql-connector-kafka_2.11-1.10.2.jar
> flink-jdbc_2.11-1.10.2.jar
> flink-csv-1.10.2-sql-jar.jar
>
>
> 5、我的思路
> 类比java程序中添加kafka认证的相关参数(3中的参数,添加完毕即可),我直接在flink SQL中建立kafka
> table时,就指定了相关的3中所说明的参数。但是用flink SQL客户端来实现消费kafka,则会报错。
>
> 6、启动客户端
> ./bin/sql-client.sh embedded -l sql_lib/
> 其中sql_lib文件夹放的是4中提及的flink-connector相关的jar包
>
>
> 7、建表语句:
> create table test_hello (
> name string
> ) with (
> ...
> ...
> 'connector.properties.sasl.mechanism' = 'PLAIN',
> 'connector.properties.security.protocol' = 'SASL_PLAINTEXT',
> 'connector.properties.sasl.jaas.config' =
> 'org.apache.kafka.comon.security.plain.PlainLoginModule required
> username="xx" password="xx-secret";',
> 'format.type' = 'csv'
> );
>
> 建表没有问题,可以正常建表。
>
> 查询表的时候,就会报错,select * from test_hello;
> 报错如下:
> could not execute sql statement. Reason:
> javax.security.auth.login.loginException: unable to find loginModule class:
> org.apache.kafka.common.security.plain.PlainLoginModule
> 但是这个类我在flink-sql-connector-kafka_2.11-1.10.2.jar包中是可以发现的,所以不知道什么原因?
>
> kafka添加鉴权后,我使用flink SQL的方式不对吗?希望有知道能帮忙解答一下,谢谢。
>