You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 瞿叶奇 <38...@qq.com> on 2021/01/30 07:14:18 UTC

回复:问题求助(Pyflink)

老师,你好,消费是没有任何问题,可以正常消费。







------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <appleyuchi@163.com&gt;;
发送时间:&nbsp;2021年1月30日(星期六) 下午3:08
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re:问题求助(Pyflink)



先看下kafka能否通过命令行消费数据.

命令行检查确保能消费,再使用Flink.













在 2021-01-30 14:25:57,"瞿叶奇" <389243409@qq.com&gt; 写道:

老师,您好,我想通过Flink消费kafka写本地csv文件,目前遇到的问题是Flink、Kafka都是kerberos认证的集群,而且我是用的是pyflink,现在程序能执行,不报错,但是不消费数据,csv文件没有结果,但是文件日期修改日期一直在更新。怀疑对接kafka 存在问题,希望老师能够给解决疑惑。
1)Kafka生产数据:

2)pyflink 程序


#!/usr/bin/python3.7
# -*- coding: UTF-8 -*-
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, CsvTableSink, WriteMode, SqlDialect
from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
s_env.enable_checkpointing(3000)
st_env = StreamTableEnvironment.create(s_env, TableConfig())
st_env.use_catalog("default_catalog")
st_env.use_database("default_database")
st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect", "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002").property("security.protocol", 'SASL_PLAINTEXT').property("sasl.kerberos.service.name", 'kafka').property("kerberos.domain.name", 'hadoop.hadoop.com').property("bootstrap.servers", "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),DataTypes.FIELD("name", DataTypes.STRING())]))).with_schema(Schema().field("id", DataTypes.BIGINT()).field("name", DataTypes.STRING())).register_table_source("sourceKafka")
fieldNames = ["id", "name"]
fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()]
csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv", ",", 1, WriteMode.OVERWRITE)
st_env.register_table_sink("csvTableSink", csvSink)
resultQuery = st_env.sql_query("select id,name from sourceKafka")
resultQuery = resultQuery.insert_into("csvTableSink")
st_env.execute("pyflink-kafka-v2")
3)pyflink-shell.sh local

4)运行结果
在pyflink-shell local运行程序的同时,启用生产者生产数据,查看结果文件如下:

可以看出文件确实在被更新,文件的修改时间在变化,但是里面是空的,一方面希望pyflink可以增加打印到控制台的功能,一方面希望老师能给出对接kerberos认证的kafka的案例,我是陕西国网用电信息采集系统新架构改造的工程师,我们计划使用flink测试kafka-hdfs数据的数据传输。希望老师能给出一个案例,帮助我们完成测试。

Re: 问题求助(Pyflink)

Posted by Shuiqiang Chen <ac...@gmail.com>.
抱歉,漏了文档链接
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/kafka.html#enabling-kerberos-authentication

Shuiqiang Chen <ac...@gmail.com> 于2021年1月30日周六 下午4:32写道:

> Hi,
> 按照这个文档, 在flink-conf.yaml里配置了security.kerberos.login.keytab 和 security.kerberos.login.principal这两个属性了吗?
> 还有就是jaas.conf文件在各个task manager所在的机器都能访问到吗?
>
> 瞿叶奇 <38...@qq.com> 于2021年1月30日周六 下午4:15写道:
>
>> 老师,您好,
>> 1)报错信息在附件内,flink-root-python-node-master1aSdM.log文件。报错信息如下:
>> Caused by: java.lang.IllegalArgumentException: Could not find a
>> 'KafkaClient' entry in the JAAS configuration. System property
>> 'java.security.auth.login.config' is not set  at
>> org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133)
>> 2)这是Flink配置里的jaas.conf
>> 3)pyspark对接kafka写hdfs没有问题,这是spark的jaas.conf
>> 3.1)这是spark配置里的jaas.conf
>>
>> 3.2)这是spark配置里的jaas-zk.conf
>>
>>
>> 是不是要修改这个文件呢?百度给出的都是java的修正,不知道这个需要如何修改。
>>
>>
>> ------------------ 原始邮件 ------------------
>> *发件人:* "user-zh" <ac...@gmail.com>;
>> *发送时间:* 2021年1月30日(星期六) 下午3:49
>> *收件人:* "user-zh"<us...@flink.apache.org>;
>> *主题:* Re: 问题求助(Pyflink)
>>
>> 你好,
>> 可以看下source task所在task manager 的日志里看看 consumer 有没有成功获取到kafka
>> partition相关meta信息和认证相关是否成功的信息。
>>
>> 瞿叶奇 <38...@qq.com> 于2021年1月30日周六 下午3:14写道:
>>
>> > 老师,你好,消费是没有任何问题,可以正常消费。
>> >
>> >
>> >
>> >
>> > ------------------ 原始邮件 ------------------
>> > *发件人:* "user-zh" <ap...@163.com>;
>> > *发送时间:* 2021年1月30日(星期六) 下午3:08
>> > *收件人:* "user-zh"<us...@flink.apache.org>;
>> > *主题:* Re:问题求助(Pyflink)
>> >
>> > 先看下kafka能否通过命令行消费数据.
>> >
>> > 命令行检查确保能消费,再使用Flink.
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > 在 2021-01-30 14:25:57,"瞿叶奇" <38...@qq.com> 写道:
>> >
>> >
>> 老师,您好,我想通过Flink消费kafka写本地csv文件,目前遇到的问题是Flink、Kafka都是kerberos认证的集群,而且我是用的是pyflink,现在程序能执行,不报错,但是不消费数据,csv文件没有结果,但是文件日期修改日期一直在更新。怀疑对接kafka
>> > 存在问题,希望老师能够给解决疑惑。
>> > 1)Kafka生产数据:
>> >
>> > 2)pyflink 程序
>> >
>> >
>> > #!/usr/bin/python3.7
>> > # -*- coding: UTF-8 -*-
>> > from pyflink.datastream import StreamExecutionEnvironment,
>> > CheckpointingMode
>> > from pyflink.table import StreamTableEnvironment, TableConfig,
>> DataTypes,
>> > CsvTableSink, WriteMode, SqlDialect
>> > from pyflink.table.descriptors import
>> FileSystem,OldCsv,Schema,Kafka,Json
>> > s_env = StreamExecutionEnvironment.get_execution_environment()
>> > s_env.set_parallelism(1)
>> > s_env.enable_checkpointing(3000)
>> > st_env = StreamTableEnvironment.create(s_env, TableConfig())
>> > st_env.use_catalog("default_catalog")
>> > st_env.use_database("default_database")
>> >
>> st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect",
>> > "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002
>> ").property("security.protocol",
>> > 'SASL_PLAINTEXT').property("sasl.kerberos.service.name",
>> > 'kafka').property("kerberos.domain.name", 'hadoop.hadoop.com
>> ').property("bootstrap.servers",
>> > "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007
>> ")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id",
>> > DataTypes.BIGINT()),DataTypes.FIELD("name",
>> > DataTypes.STRING())]))).with_schema(Schema().field("id",
>> > DataTypes.BIGINT()).field("name",
>> > DataTypes.STRING())).register_table_source("sourceKafka")
>> > fieldNames = ["id", "name"]
>> > fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()]
>> > csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv",
>> ",",
>> > 1, WriteMode.OVERWRITE)
>> > st_env.register_table_sink("csvTableSink", csvSink)
>> > resultQuery = st_env.sql_query("select id,name from sourceKafka")
>> > resultQuery = resultQuery.insert_into("csvTableSink")
>> > st_env.execute("pyflink-kafka-v2")
>> > 3)pyflink-shell.sh local
>> >
>> > 4)运行结果
>> > 在pyflink-shell local运行程序的同时,启用生产者生产数据,查看结果文件如下:
>> >
>> >
>> >
>> 可以看出文件确实在被更新,文件的修改时间在变化,但是里面是空的,一方面希望pyflink可以增加打印到控制台的功能,一方面希望老师能给出对接kerberos认证的kafka的案例,我是陕西国网用电信息采集系统新架构改造的工程师,我们计划使用flink测试kafka-hdfs数据的数据传输。希望老师能给出一个案例,帮助我们完成测试。
>> >
>> >
>> >
>> >
>>
>

Re: 问题求助(Pyflink)

Posted by Shuiqiang Chen <ac...@gmail.com>.
Hi,
按照这个文档, 在flink-conf.yaml里配置了security.kerberos.login.keytab 和
security.kerberos.login.principal这两个属性了吗?
还有就是jaas.conf文件在各个task manager所在的机器都能访问到吗?

瞿叶奇 <38...@qq.com> 于2021年1月30日周六 下午4:15写道:

> 老师,您好,
> 1)报错信息在附件内,flink-root-python-node-master1aSdM.log文件。报错信息如下:
> Caused by: java.lang.IllegalArgumentException: Could not find a
> 'KafkaClient' entry in the JAAS configuration. System property
> 'java.security.auth.login.config' is not set  at
> org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133)
> 2)这是Flink配置里的jaas.conf
> 3)pyspark对接kafka写hdfs没有问题,这是spark的jaas.conf
> 3.1)这是spark配置里的jaas.conf
>
> 3.2)这是spark配置里的jaas-zk.conf
>
>
> 是不是要修改这个文件呢?百度给出的都是java的修正,不知道这个需要如何修改。
>
>
> ------------------ 原始邮件 ------------------
> *发件人:* "user-zh" <ac...@gmail.com>;
> *发送时间:* 2021年1月30日(星期六) 下午3:49
> *收件人:* "user-zh"<us...@flink.apache.org>;
> *主题:* Re: 问题求助(Pyflink)
>
> 你好,
> 可以看下source task所在task manager 的日志里看看 consumer 有没有成功获取到kafka
> partition相关meta信息和认证相关是否成功的信息。
>
> 瞿叶奇 <38...@qq.com> 于2021年1月30日周六 下午3:14写道:
>
> > 老师,你好,消费是没有任何问题,可以正常消费。
> >
> >
> >
> >
> > ------------------ 原始邮件 ------------------
> > *发件人:* "user-zh" <ap...@163.com>;
> > *发送时间:* 2021年1月30日(星期六) 下午3:08
> > *收件人:* "user-zh"<us...@flink.apache.org>;
> > *主题:* Re:问题求助(Pyflink)
> >
> > 先看下kafka能否通过命令行消费数据.
> >
> > 命令行检查确保能消费,再使用Flink.
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2021-01-30 14:25:57,"瞿叶奇" <38...@qq.com> 写道:
> >
> >
> 老师,您好,我想通过Flink消费kafka写本地csv文件,目前遇到的问题是Flink、Kafka都是kerberos认证的集群,而且我是用的是pyflink,现在程序能执行,不报错,但是不消费数据,csv文件没有结果,但是文件日期修改日期一直在更新。怀疑对接kafka
> > 存在问题,希望老师能够给解决疑惑。
> > 1)Kafka生产数据:
> >
> > 2)pyflink 程序
> >
> >
> > #!/usr/bin/python3.7
> > # -*- coding: UTF-8 -*-
> > from pyflink.datastream import StreamExecutionEnvironment,
> > CheckpointingMode
> > from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes,
> > CsvTableSink, WriteMode, SqlDialect
> > from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json
> > s_env = StreamExecutionEnvironment.get_execution_environment()
> > s_env.set_parallelism(1)
> > s_env.enable_checkpointing(3000)
> > st_env = StreamTableEnvironment.create(s_env, TableConfig())
> > st_env.use_catalog("default_catalog")
> > st_env.use_database("default_database")
> >
> st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect",
> > "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002
> ").property("security.protocol",
> > 'SASL_PLAINTEXT').property("sasl.kerberos.service.name",
> > 'kafka').property("kerberos.domain.name", 'hadoop.hadoop.com
> ').property("bootstrap.servers",
> > "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007
> ")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id",
> > DataTypes.BIGINT()),DataTypes.FIELD("name",
> > DataTypes.STRING())]))).with_schema(Schema().field("id",
> > DataTypes.BIGINT()).field("name",
> > DataTypes.STRING())).register_table_source("sourceKafka")
> > fieldNames = ["id", "name"]
> > fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()]
> > csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv",
> ",",
> > 1, WriteMode.OVERWRITE)
> > st_env.register_table_sink("csvTableSink", csvSink)
> > resultQuery = st_env.sql_query("select id,name from sourceKafka")
> > resultQuery = resultQuery.insert_into("csvTableSink")
> > st_env.execute("pyflink-kafka-v2")
> > 3)pyflink-shell.sh local
> >
> > 4)运行结果
> > 在pyflink-shell local运行程序的同时,启用生产者生产数据,查看结果文件如下:
> >
> >
> >
> 可以看出文件确实在被更新,文件的修改时间在变化,但是里面是空的,一方面希望pyflink可以增加打印到控制台的功能,一方面希望老师能给出对接kerberos认证的kafka的案例,我是陕西国网用电信息采集系统新架构改造的工程师,我们计划使用flink测试kafka-hdfs数据的数据传输。希望老师能给出一个案例,帮助我们完成测试。
> >
> >
> >
> >
>

Re: 问题求助(Pyflink)

Posted by Shuiqiang Chen <ac...@gmail.com>.
你好,
可以看下source task所在task manager 的日志里看看 consumer 有没有成功获取到kafka
partition相关meta信息和认证相关是否成功的信息。

瞿叶奇 <38...@qq.com> 于2021年1月30日周六 下午3:14写道:

> 老师,你好,消费是没有任何问题,可以正常消费。
>
>
>
>
> ------------------ 原始邮件 ------------------
> *发件人:* "user-zh" <ap...@163.com>;
> *发送时间:* 2021年1月30日(星期六) 下午3:08
> *收件人:* "user-zh"<us...@flink.apache.org>;
> *主题:* Re:问题求助(Pyflink)
>
> 先看下kafka能否通过命令行消费数据.
>
> 命令行检查确保能消费,再使用Flink.
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-01-30 14:25:57,"瞿叶奇" <38...@qq.com> 写道:
>
> 老师,您好,我想通过Flink消费kafka写本地csv文件,目前遇到的问题是Flink、Kafka都是kerberos认证的集群,而且我是用的是pyflink,现在程序能执行,不报错,但是不消费数据,csv文件没有结果,但是文件日期修改日期一直在更新。怀疑对接kafka
> 存在问题,希望老师能够给解决疑惑。
> 1)Kafka生产数据:
>
> 2)pyflink 程序
>
>
> #!/usr/bin/python3.7
> # -*- coding: UTF-8 -*-
> from pyflink.datastream import StreamExecutionEnvironment,
> CheckpointingMode
> from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes,
> CsvTableSink, WriteMode, SqlDialect
> from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json
> s_env = StreamExecutionEnvironment.get_execution_environment()
> s_env.set_parallelism(1)
> s_env.enable_checkpointing(3000)
> st_env = StreamTableEnvironment.create(s_env, TableConfig())
> st_env.use_catalog("default_catalog")
> st_env.use_database("default_database")
> st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect",
> "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002").property("security.protocol",
> 'SASL_PLAINTEXT').property("sasl.kerberos.service.name",
> 'kafka').property("kerberos.domain.name", 'hadoop.hadoop.com').property("bootstrap.servers",
> "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id",
> DataTypes.BIGINT()),DataTypes.FIELD("name",
> DataTypes.STRING())]))).with_schema(Schema().field("id",
> DataTypes.BIGINT()).field("name",
> DataTypes.STRING())).register_table_source("sourceKafka")
> fieldNames = ["id", "name"]
> fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()]
> csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv", ",",
> 1, WriteMode.OVERWRITE)
> st_env.register_table_sink("csvTableSink", csvSink)
> resultQuery = st_env.sql_query("select id,name from sourceKafka")
> resultQuery = resultQuery.insert_into("csvTableSink")
> st_env.execute("pyflink-kafka-v2")
> 3)pyflink-shell.sh local
>
> 4)运行结果
> 在pyflink-shell local运行程序的同时,启用生产者生产数据,查看结果文件如下:
>
>
> 可以看出文件确实在被更新,文件的修改时间在变化,但是里面是空的,一方面希望pyflink可以增加打印到控制台的功能,一方面希望老师能给出对接kerberos认证的kafka的案例,我是陕西国网用电信息采集系统新架构改造的工程师,我们计划使用flink测试kafka-hdfs数据的数据传输。希望老师能给出一个案例,帮助我们完成测试。
>
>
>
>