You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by jack <ws...@163.com> on 2020/06/01 09:31:30 UTC
pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题
请教各位,我这边使用pyflink 消费kafka json字符串数据发送到kafka, 把kafka消息按照一个字段来处理,
数据输入:
{"topic": "logSource", "message": "x=1,y=1,z=1"}
发送到kafka里面的数据结果如下:
"{""topic"": ""logSource"", ""message"": ""x=1,y=1,z=1""}"
又被双引号包了一层, 我的udf函数中使用的是 json模块进行处理,请各位帮我看一下怎么样转成正常的json串。
@udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING())
defkv(log, pair_sep=',', kv_sep='='):
import json
log = json.loads(log)
ret = {}
items = re.split(pair_sep, log.get("message"))
for item in items:
ret[re.split(kv_sep, item)[0]] = re.split(kv_sep, item)[1]
log.update(ret)
log = json.dumps(log)
return log
defregister_source(st_env):
st_env \
.connect( # declare the external system to connect to
Kafka()
.version("0.10")
.topic("logSource")
.start_from_latest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")) \
.with_format( # declare a format for this system
Csv()
.schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))
.field_delimiter("\n")) \
.with_schema( # declare the schema of the table
Schema()
.field("log", DataTypes.STRING())) \
.in_append_mode() \
.register_table_source("source")
defregister_sink(st_env):
st_env.connect(
Kafka()
.version("0.10")
.topic("logSink")
.start_from_earliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")) \
.with_format( # declare a format for this system
Csv()
.schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))) \
.with_schema( # declare the schema of the table
Schema()
.field("log", DataTypes.STRING())) \
.in_append_mode() \
.register_table_sink("sink")
if __name__ == '__main__':
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
s_env.set_parallelism(1)
st_env = StreamTableEnvironment \
.create(s_env, environment_settings=EnvironmentSettings
.new_instance()
.in_streaming_mode()
.use_blink_planner().build())
st_env.register_function('e_kv', e_kv)
register_source(st_env)
register_sink(st_env)
st_env \
.from_path("source") \
.select("kv(log,',', '=') as log") \
.insert_into("sink") \
st_env.execute("test")
Re:Re: pyflink数据查询
Posted by jack <ws...@163.com>.
感谢您的建议,目前在学习使用pyflink,使用pyflink做各种有趣的尝试,包括udf函数做日志解析等,也看过
目前官方文档对于pyflink的文档和例子还是偏少,遇到问题了还是需要向各位大牛们多多请教。
Best,
Jack
在 2020-06-15 16:13:32,"jincheng sun" <su...@gmail.com> 写道:
>你好 Jack,
>
>> pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,
>我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询
>
>我理解你上面说的 【直接作为结果】+ 【web接口查询】已经包含了“sink”的动作。只是这个“sink” 是这样的实现而已。对于您的场景:
>1. 如果您想直接将结果不落地(不存储)执行推送的 web页面,可以自定义一个Web Socket的Sink。
>2. 如果您不是想直接推送到web页面,而是通过查询拉取结果,那么您上面说的
>【直接作为结果】这句话就要描述一下,您想怎样作为结果?我理解是要落盘的(持久化),所以这里持久化本质也是一个sink。Flink可以支持很多中sink,比如:数据库,文件系统,消息队列等等。您可以参考官方文档:
>https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html
>
>如果上面回复 没有解决你的问题,欢迎随时反馈~~
>
>Best,
>Jincheng
>
>
>
>Jeff Zhang <zj...@gmail.com> 于2020年6月9日周二 下午5:39写道:
>
>> 可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程
>> https://www.bilibili.com/video/BV1Te411W73b?p=20
>> 可以加入钉钉群讨论:30022475
>>
>>
>>
>> jack <ws...@163.com> 于2020年6月9日周二 下午5:28写道:
>>
>>> 问题请教:
>>> 描述: pyflink 从source通过sql对数据进行查询聚合等操作
>>> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
>>>
>>> flink能否实现这样的方式?
>>> 感谢
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
Re: pyflink数据查询
Posted by jack <ws...@163.com>.
hi
感谢您的建议,我这边尝试一下自定义实现sink的方式。
Best,
Jack
在 2020-06-15 18:08:15,"godfrey he" <go...@gmail.com> 写道:
hi jack,jincheng
Flink 1.11 支持直接将select的结果collect到本地,例如:
CloseableIterator<Row> it = tEnv.executeSql("select ...").collect();
while(it.hasNext()) {
it.next() ....
}
但是 pyflink 还没有引入 collect() 接口。(后续会完善?@jincheng)
但是1.11的TableResult#collect实现对流的query支持不完整(只支持append only的query),master已经完整支持。
可以参照 jincheng 的意见,(或者结合 TableResult#collect 的实现),完成一个自己的 sink 也可以。
Best,
Godfrey
jincheng sun <su...@gmail.com> 于2020年6月15日周一 下午4:14写道:
你好 Jack,
> pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询
我理解你上面说的 【直接作为结果】+ 【web接口查询】已经包含了“sink”的动作。只是这个“sink” 是这样的实现而已。对于您的场景:
1. 如果您想直接将结果不落地(不存储)执行推送的 web页面,可以自定义一个Web Socket的Sink。
2. 如果您不是想直接推送到web页面,而是通过查询拉取结果,那么您上面说的 【直接作为结果】这句话就要描述一下,您想怎样作为结果?我理解是要落盘的(持久化),所以这里持久化本质也是一个sink。Flink可以支持很多中sink,比如:数据库,文件系统,消息队列等等。您可以参考官方文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html
如果上面回复 没有解决你的问题,欢迎随时反馈~~
Best,
Jincheng
Jeff Zhang <zj...@gmail.com> 于2020年6月9日周二 下午5:39写道:
可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程 https://www.bilibili.com/video/BV1Te411W73b?p=20
可以加入钉钉群讨论:30022475
jack <ws...@163.com> 于2020年6月9日周二 下午5:28写道:
问题请教:
描述: pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
flink能否实现这样的方式?
感谢
--
Best Regards
Jeff Zhang
Re: pyflink数据查询
Posted by jack <ws...@163.com>.
hi
感谢您的建议,我这边尝试一下自定义实现sink的方式。
Best,
Jack
在 2020-06-15 18:08:15,"godfrey he" <go...@gmail.com> 写道:
hi jack,jincheng
Flink 1.11 支持直接将select的结果collect到本地,例如:
CloseableIterator<Row> it = tEnv.executeSql("select ...").collect();
while(it.hasNext()) {
it.next() ....
}
但是 pyflink 还没有引入 collect() 接口。(后续会完善?@jincheng)
但是1.11的TableResult#collect实现对流的query支持不完整(只支持append only的query),master已经完整支持。
可以参照 jincheng 的意见,(或者结合 TableResult#collect 的实现),完成一个自己的 sink 也可以。
Best,
Godfrey
jincheng sun <su...@gmail.com> 于2020年6月15日周一 下午4:14写道:
你好 Jack,
> pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询
我理解你上面说的 【直接作为结果】+ 【web接口查询】已经包含了“sink”的动作。只是这个“sink” 是这样的实现而已。对于您的场景:
1. 如果您想直接将结果不落地(不存储)执行推送的 web页面,可以自定义一个Web Socket的Sink。
2. 如果您不是想直接推送到web页面,而是通过查询拉取结果,那么您上面说的 【直接作为结果】这句话就要描述一下,您想怎样作为结果?我理解是要落盘的(持久化),所以这里持久化本质也是一个sink。Flink可以支持很多中sink,比如:数据库,文件系统,消息队列等等。您可以参考官方文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html
如果上面回复 没有解决你的问题,欢迎随时反馈~~
Best,
Jincheng
Jeff Zhang <zj...@gmail.com> 于2020年6月9日周二 下午5:39写道:
可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程 https://www.bilibili.com/video/BV1Te411W73b?p=20
可以加入钉钉群讨论:30022475
jack <ws...@163.com> 于2020年6月9日周二 下午5:28写道:
问题请教:
描述: pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
flink能否实现这样的方式?
感谢
--
Best Regards
Jeff Zhang
Re: pyflink数据查询
Posted by godfrey he <go...@gmail.com>.
hi jack,jincheng
Flink 1.11 支持直接将select的结果collect到本地,例如:
CloseableIterator<Row> it = tEnv.executeSql("select ...").collect();
while(it.hasNext()) {
it.next() ....
}
但是 pyflink 还没有引入 collect() 接口。(后续会完善?@jincheng)
但是1.11的TableResult#collect实现对流的query支持不完整(只支持append
only的query),master已经完整支持。
可以参照 jincheng 的意见,(或者结合 TableResult#collect 的实现),完成一个自己的 sink 也可以。
Best,
Godfrey
jincheng sun <su...@gmail.com> 于2020年6月15日周一 下午4:14写道:
> 你好 Jack,
>
> > pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,
> 我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询
>
> 我理解你上面说的 【直接作为结果】+ 【web接口查询】已经包含了“sink”的动作。只是这个“sink” 是这样的实现而已。对于您的场景:
> 1. 如果您想直接将结果不落地(不存储)执行推送的 web页面,可以自定义一个Web Socket的Sink。
> 2. 如果您不是想直接推送到web页面,而是通过查询拉取结果,那么您上面说的
> 【直接作为结果】这句话就要描述一下,您想怎样作为结果?我理解是要落盘的(持久化),所以这里持久化本质也是一个sink。Flink可以支持很多中sink,比如:数据库,文件系统,消息队列等等。您可以参考官方文档:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html
>
> 如果上面回复 没有解决你的问题,欢迎随时反馈~~
>
> Best,
> Jincheng
>
>
>
> Jeff Zhang <zj...@gmail.com> 于2020年6月9日周二 下午5:39写道:
>
>> 可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程
>> https://www.bilibili.com/video/BV1Te411W73b?p=20
>> 可以加入钉钉群讨论:30022475
>>
>>
>>
>> jack <ws...@163.com> 于2020年6月9日周二 下午5:28写道:
>>
>>> 问题请教:
>>> 描述: pyflink 从source通过sql对数据进行查询聚合等操作
>>> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
>>>
>>> flink能否实现这样的方式?
>>> 感谢
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
Re: pyflink数据查询
Posted by godfrey he <go...@gmail.com>.
hi jack,jincheng
Flink 1.11 支持直接将select的结果collect到本地,例如:
CloseableIterator<Row> it = tEnv.executeSql("select ...").collect();
while(it.hasNext()) {
it.next() ....
}
但是 pyflink 还没有引入 collect() 接口。(后续会完善?@jincheng)
但是1.11的TableResult#collect实现对流的query支持不完整(只支持append
only的query),master已经完整支持。
可以参照 jincheng 的意见,(或者结合 TableResult#collect 的实现),完成一个自己的 sink 也可以。
Best,
Godfrey
jincheng sun <su...@gmail.com> 于2020年6月15日周一 下午4:14写道:
> 你好 Jack,
>
> > pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,
> 我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询
>
> 我理解你上面说的 【直接作为结果】+ 【web接口查询】已经包含了“sink”的动作。只是这个“sink” 是这样的实现而已。对于您的场景:
> 1. 如果您想直接将结果不落地(不存储)执行推送的 web页面,可以自定义一个Web Socket的Sink。
> 2. 如果您不是想直接推送到web页面,而是通过查询拉取结果,那么您上面说的
> 【直接作为结果】这句话就要描述一下,您想怎样作为结果?我理解是要落盘的(持久化),所以这里持久化本质也是一个sink。Flink可以支持很多中sink,比如:数据库,文件系统,消息队列等等。您可以参考官方文档:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html
>
> 如果上面回复 没有解决你的问题,欢迎随时反馈~~
>
> Best,
> Jincheng
>
>
>
> Jeff Zhang <zj...@gmail.com> 于2020年6月9日周二 下午5:39写道:
>
>> 可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程
>> https://www.bilibili.com/video/BV1Te411W73b?p=20
>> 可以加入钉钉群讨论:30022475
>>
>>
>>
>> jack <ws...@163.com> 于2020年6月9日周二 下午5:28写道:
>>
>>> 问题请教:
>>> 描述: pyflink 从source通过sql对数据进行查询聚合等操作
>>> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
>>>
>>> flink能否实现这样的方式?
>>> 感谢
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
Re: pyflink数据查询
Posted by jincheng sun <su...@gmail.com>.
你好 Jack,
> pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,
我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询
我理解你上面说的 【直接作为结果】+ 【web接口查询】已经包含了“sink”的动作。只是这个“sink” 是这样的实现而已。对于您的场景:
1. 如果您想直接将结果不落地(不存储)执行推送的 web页面,可以自定义一个Web Socket的Sink。
2. 如果您不是想直接推送到web页面,而是通过查询拉取结果,那么您上面说的
【直接作为结果】这句话就要描述一下,您想怎样作为结果?我理解是要落盘的(持久化),所以这里持久化本质也是一个sink。Flink可以支持很多中sink,比如:数据库,文件系统,消息队列等等。您可以参考官方文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html
如果上面回复 没有解决你的问题,欢迎随时反馈~~
Best,
Jincheng
Jeff Zhang <zj...@gmail.com> 于2020年6月9日周二 下午5:39写道:
> 可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程
> https://www.bilibili.com/video/BV1Te411W73b?p=20
> 可以加入钉钉群讨论:30022475
>
>
>
> jack <ws...@163.com> 于2020年6月9日周二 下午5:28写道:
>
>> 问题请教:
>> 描述: pyflink 从source通过sql对数据进行查询聚合等操作
>> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
>>
>> flink能否实现这样的方式?
>> 感谢
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>
Re: pyflink数据查询
Posted by jincheng sun <su...@gmail.com>.
你好 Jack,
> pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,
我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询
我理解你上面说的 【直接作为结果】+ 【web接口查询】已经包含了“sink”的动作。只是这个“sink” 是这样的实现而已。对于您的场景:
1. 如果您想直接将结果不落地(不存储)执行推送的 web页面,可以自定义一个Web Socket的Sink。
2. 如果您不是想直接推送到web页面,而是通过查询拉取结果,那么您上面说的
【直接作为结果】这句话就要描述一下,您想怎样作为结果?我理解是要落盘的(持久化),所以这里持久化本质也是一个sink。Flink可以支持很多中sink,比如:数据库,文件系统,消息队列等等。您可以参考官方文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html
如果上面回复 没有解决你的问题,欢迎随时反馈~~
Best,
Jincheng
Jeff Zhang <zj...@gmail.com> 于2020年6月9日周二 下午5:39写道:
> 可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程
> https://www.bilibili.com/video/BV1Te411W73b?p=20
> 可以加入钉钉群讨论:30022475
>
>
>
> jack <ws...@163.com> 于2020年6月9日周二 下午5:28写道:
>
>> 问题请教:
>> 描述: pyflink 从source通过sql对数据进行查询聚合等操作
>> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
>>
>> flink能否实现这样的方式?
>> 感谢
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>
Re: pyflink数据查询
Posted by Jeff Zhang <zj...@gmail.com>.
可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程
https://www.bilibili.com/video/BV1Te411W73b?p=20
可以加入钉钉群讨论:30022475
jack <ws...@163.com> 于2020年6月9日周二 下午5:28写道:
> 问题请教:
> 描述: pyflink 从source通过sql对数据进行查询聚合等操作
> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
>
> flink能否实现这样的方式?
> 感谢
>
--
Best Regards
Jeff Zhang
Re: pyflink数据查询
Posted by Jeff Zhang <zj...@gmail.com>.
可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程
https://www.bilibili.com/video/BV1Te411W73b?p=20
可以加入钉钉群讨论:30022475
jack <ws...@163.com> 于2020年6月9日周二 下午5:28写道:
> 问题请教:
> 描述: pyflink 从source通过sql对数据进行查询聚合等操作
> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
>
> flink能否实现这样的方式?
> 感谢
>
--
Best Regards
Jeff Zhang
pyflink数据查询
Posted by jack <ws...@163.com>.
问题请教:
描述: pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
flink能否实现这样的方式?
感谢
Re: pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题
Posted by Xingbo Huang <hx...@gmail.com>.
客气客气,互相交流学习😀
Best,
Xingbo
jack <ws...@163.com> 于2020年6月1日周一 下午9:07写道:
> 非常感谢解惑,刚开始使用pyflink,不是很熟悉,还请多多指教
>
>
>
>
>
>
> 在 2020-06-01 20:50:53,"Xingbo Huang" <hx...@gmail.com> 写道:
>
> Hi,
> 其实这个是CSV connector的一个可选的
> quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。
> st_env.connect(
> Kafka()
> .version("0.11")
> .topic("logSink")
> .start_from_earliest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092")) \
> .with_format( # declare a format for this system
> Csv()
> .schema(DataTypes.ROW([DataTypes.FIELD("log",
> DataTypes.STRING())]))
> .quote_character("\0")
> ) \
> .with_schema( # declare the schema of the table
> Schema()
> .field("log", DataTypes.STRING())) \
> .in_append_mode() \
> .register_table_sink("sink")
>
> Best,
> Xingbo
>
>>
>>
>>
Re: pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题
Posted by Xingbo Huang <hx...@gmail.com>.
客气客气,互相交流学习😀
Best,
Xingbo
jack <ws...@163.com> 于2020年6月1日周一 下午9:07写道:
> 非常感谢解惑,刚开始使用pyflink,不是很熟悉,还请多多指教
>
>
>
>
>
>
> 在 2020-06-01 20:50:53,"Xingbo Huang" <hx...@gmail.com> 写道:
>
> Hi,
> 其实这个是CSV connector的一个可选的
> quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。
> st_env.connect(
> Kafka()
> .version("0.11")
> .topic("logSink")
> .start_from_earliest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092")) \
> .with_format( # declare a format for this system
> Csv()
> .schema(DataTypes.ROW([DataTypes.FIELD("log",
> DataTypes.STRING())]))
> .quote_character("\0")
> ) \
> .with_schema( # declare the schema of the table
> Schema()
> .field("log", DataTypes.STRING())) \
> .in_append_mode() \
> .register_table_sink("sink")
>
> Best,
> Xingbo
>
>>
>>
>>
pyflink数据查询
Posted by jack <ws...@163.com>.
问题请教:
描述: pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
flink能否实现这样的方式?
感谢
Re: pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题
Posted by jack <ws...@163.com>.
非常感谢解惑,刚开始使用pyflink,不是很熟悉,还请多多指教
在 2020-06-01 20:50:53,"Xingbo Huang" <hx...@gmail.com> 写道:
Hi,
其实这个是CSV connector的一个可选的quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。
st_env.connect(
Kafka()
.version("0.11")
.topic("logSink")
.start_from_earliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")) \
.with_format( # declare a format for this system
Csv()
.schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))
.quote_character("\0")
) \
.with_schema( # declare the schema of the table
Schema()
.field("log", DataTypes.STRING())) \
.in_append_mode() \
.register_table_sink("sink")
Best,
Xingbo
Re: pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题
Posted by jack <ws...@163.com>.
非常感谢解惑,刚开始使用pyflink,不是很熟悉,还请多多指教
在 2020-06-01 20:50:53,"Xingbo Huang" <hx...@gmail.com> 写道:
Hi,
其实这个是CSV connector的一个可选的quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。
st_env.connect(
Kafka()
.version("0.11")
.topic("logSink")
.start_from_earliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")) \
.with_format( # declare a format for this system
Csv()
.schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))
.quote_character("\0")
) \
.with_schema( # declare the schema of the table
Schema()
.field("log", DataTypes.STRING())) \
.in_append_mode() \
.register_table_sink("sink")
Best,
Xingbo
Re: pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题
Posted by Xingbo Huang <hx...@gmail.com>.
Hi,
其实这个是CSV connector的一个可选的
quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。
st_env.connect(
Kafka()
.version("0.11")
.topic("logSink")
.start_from_earliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")) \
.with_format( # declare a format for this system
Csv()
.schema(DataTypes.ROW([DataTypes.FIELD("log",
DataTypes.STRING())]))
.quote_character("\0")
) \
.with_schema( # declare the schema of the table
Schema()
.field("log", DataTypes.STRING())) \
.in_append_mode() \
.register_table_sink("sink")
Best,
Xingbo
jack <ws...@163.com> 于2020年6月1日周一 下午5:31写道:
> *请教各位,我这边使用pyflink 消费kafka json字符串数据发送到kafka, 把kafka消息按照一个字段来处理,*
>
> *数据输入:*
> {"topic": "logSource", "message": "x=1,y=1,z=1"}
>
> 发送到kafka里面的数据结果如下:
> "{""topic"": ""logSource"", ""message"": ""x=1,y=1,z=1""}"
>
> *又被双引号包了一层, 我的udf函数中使用的是 json模块进行处理,请各位帮我看一下怎么样转成正常的json串。*
>
> @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING())
> def kv(log, pair_sep=',', kv_sep='='):
> import json
> log = json.loads(log)
> ret = {}
> items = re.split(pair_sep, log.get("message"))
> for item in items:
> ret[re.split(kv_sep, item)[0]] = re.split(kv_sep, item)[1]
> log.update(ret)
> log = json.dumps(log)
> return log
>
>
> def register_source(st_env):
> st_env \
> .connect( # declare the external system to connect to
> Kafka()
> .version("0.10")
> .topic("logSource")
> .start_from_latest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092")) \
> .with_format( # declare a format for this system
> Csv()
> .schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))
> .field_delimiter("\n")) \
> .with_schema( # declare the schema of the table
> Schema()
> .field("log", DataTypes.STRING())) \
> .in_append_mode() \
> .register_table_source("source")
>
> def register_sink(st_env):
> st_env.connect(
> Kafka()
> .version("0.10")
> .topic("logSink")
> .start_from_earliest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092")) \
> .with_format( # declare a format for this system
> Csv()
> .schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))) \
> .with_schema( # declare the schema of the table
> Schema()
> .field("log", DataTypes.STRING())) \
> .in_append_mode() \
> .register_table_sink("sink")
>
> if __name__ == '__main__':
>
> s_env = StreamExecutionEnvironment.get_execution_environment()
> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> s_env.set_parallelism(1)
> st_env = StreamTableEnvironment \
> .create(s_env, environment_settings=EnvironmentSettings
> .new_instance()
> .in_streaming_mode()
> .use_blink_planner().build())
> st_env.register_function('e_kv', e_kv)
> register_source(st_env)
> register_sink(st_env)
> st_env \
> .from_path("source") \
> .select("kv(log,',', '=') as log") \
> .insert_into("sink") \
> st_env.execute("test")
>
>
>
Re: pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题
Posted by Xingbo Huang <hx...@gmail.com>.
Hi,
其实这个是CSV connector的一个可选的
quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。
st_env.connect(
Kafka()
.version("0.11")
.topic("logSink")
.start_from_earliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")) \
.with_format( # declare a format for this system
Csv()
.schema(DataTypes.ROW([DataTypes.FIELD("log",
DataTypes.STRING())]))
.quote_character("\0")
) \
.with_schema( # declare the schema of the table
Schema()
.field("log", DataTypes.STRING())) \
.in_append_mode() \
.register_table_sink("sink")
Best,
Xingbo
jack <ws...@163.com> 于2020年6月1日周一 下午5:31写道:
> *请教各位,我这边使用pyflink 消费kafka json字符串数据发送到kafka, 把kafka消息按照一个字段来处理,*
>
> *数据输入:*
> {"topic": "logSource", "message": "x=1,y=1,z=1"}
>
> 发送到kafka里面的数据结果如下:
> "{""topic"": ""logSource"", ""message"": ""x=1,y=1,z=1""}"
>
> *又被双引号包了一层, 我的udf函数中使用的是 json模块进行处理,请各位帮我看一下怎么样转成正常的json串。*
>
> @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING())
> def kv(log, pair_sep=',', kv_sep='='):
> import json
> log = json.loads(log)
> ret = {}
> items = re.split(pair_sep, log.get("message"))
> for item in items:
> ret[re.split(kv_sep, item)[0]] = re.split(kv_sep, item)[1]
> log.update(ret)
> log = json.dumps(log)
> return log
>
>
> def register_source(st_env):
> st_env \
> .connect( # declare the external system to connect to
> Kafka()
> .version("0.10")
> .topic("logSource")
> .start_from_latest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092")) \
> .with_format( # declare a format for this system
> Csv()
> .schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))
> .field_delimiter("\n")) \
> .with_schema( # declare the schema of the table
> Schema()
> .field("log", DataTypes.STRING())) \
> .in_append_mode() \
> .register_table_source("source")
>
> def register_sink(st_env):
> st_env.connect(
> Kafka()
> .version("0.10")
> .topic("logSink")
> .start_from_earliest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092")) \
> .with_format( # declare a format for this system
> Csv()
> .schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))) \
> .with_schema( # declare the schema of the table
> Schema()
> .field("log", DataTypes.STRING())) \
> .in_append_mode() \
> .register_table_sink("sink")
>
> if __name__ == '__main__':
>
> s_env = StreamExecutionEnvironment.get_execution_environment()
> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> s_env.set_parallelism(1)
> st_env = StreamTableEnvironment \
> .create(s_env, environment_settings=EnvironmentSettings
> .new_instance()
> .in_streaming_mode()
> .use_blink_planner().build())
> st_env.register_function('e_kv', e_kv)
> register_source(st_env)
> register_sink(st_env)
> st_env \
> .from_path("source") \
> .select("kv(log,',', '=') as log") \
> .insert_into("sink") \
> st_env.execute("test")
>
>
>