You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Xingbo Huang <hx...@gmail.com> on 2020/05/29 08:48:34 UTC
Fwd: pyflink Table Api连接 外部系统问题
---------- Forwarded message ---------
发件人: Xingbo Huang <hx...@gmail.com>
Date: 2020年5月29日周五 下午4:30
Subject: Re: pyflink Table Api连接 外部系统问题
To: 刘亚坤 <ws...@163.com>
你好,
你想问的应该是如何把kafka里面的一整个json数据当成一个string读进来,然后不做任何format解析对吧。如果是这样的话,我的理解是,你首先不能用json
format,需要使用csv
format,然后你得指定一个field_delimiter,默认的是逗号,你得换一个,比如\n,要不然就会把你的json字符串数据按照都厚给切分开了。我刚刚用descriptor试验了一下,没有问题。你可以试试。
下面是我整个PyFlink读取json串进来然后解析数据中time字段的作业
def str_func(str_param):
import json
return json.loads(str_param)['time']
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
s_env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
st_env = StreamTableEnvironment.create(s_env)
result_file = "/tmp/slide_row_window_streaming.csv"
if os.path.exists(result_file):
os.remove(result_file)
st_env \
.connect( # declare the external system to connect to
Kafka()
.version("0.11")
.topic("user")
.start_from_earliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
) \
.with_format( # declare a format for this system
Csv()
.schema(DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.STRING())
]))
.field_delimiter('\n')
) \
.with_schema( # declare the schema of the table
Schema()
.field("a", DataTypes.STRING())
) \
.in_append_mode() \
.register_table_source("source")
st_env.register_function(
"str_func", udf(str_func, [DataTypes.STRING()], DataTypes.STRING()))
st_env.register_table_sink("sink",
CsvTableSink(["a"],
[DataTypes.STRING()],
result_file))
st_env.scan("source").select("str_func(a)").insert_into("sink")
kafka里面的数据
{"a": "a", "b": 1, "c": 1, "time": "2013-01-01T00:14:13Z"}
{"a": "b", "b": 2, "c": 2, "time": "2013-01-01T00:24:13Z"}
{"a": "a", "b": 3, "c": 3, "time": "2013-01-01T00:34:13Z"}
{"a": "a", "b": 4, "c": 4, "time": "2013-01-01T01:14:13Z"}
{"a": "b", "b": 4, "c": 5, "time": "2013-01-01T01:24:13Z"}
{"a": "a", "b": 5, "c": 2, "time": "2013-01-01T01:34:13Z"}
最后Csv里面的结果数据为
2013-01-01T00:14:13Z
2013-01-01T00:24:13Z
2013-01-01T00:34:13Z
2013-01-01T01:14:13Z
2013-01-01T01:24:13Z
2013-01-01T01:34:13Z
Best,
Xingbo
刘亚坤 <ws...@163.com> 于2020年5月29日周五 下午2:17写道:
> 目前在学习使用pyflink的Table api,请教一个问题:
> 1、Table Api连接kafka系统,能否把整条的kafka消息看成是一个table字段进行处理?比如,kafka
> topic连的消息为一个json字符串,把这个字符串整体当做是一个字段,这样可以方便使用 pyflink 的udf函数对消息进行处理转换等操作?
> 2、如果以上可行,连接kafka的数据格式如何设置,即with_format如何设置,目前官网这方便的资料较少。
>
> 新手入门,请多指教,感谢。
>