You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Xingbo Huang <hx...@gmail.com> on 2020/05/29 08:48:34 UTC

Fwd: pyflink Table Api连接 外部系统问题

---------- Forwarded message ---------
发件人: Xingbo Huang <hx...@gmail.com>
Date: 2020年5月29日周五 下午4:30
Subject: Re: pyflink Table Api连接 外部系统问题
To: 刘亚坤 <ws...@163.com>


你好,
你想问的应该是如何把kafka里面的一整个json数据当成一个string读进来,然后不做任何format解析对吧。如果是这样的话,我的理解是,你首先不能用json
format,需要使用csv
format,然后你得指定一个field_delimiter,默认的是逗号,你得换一个,比如\n,要不然就会把你的json字符串数据按照都厚给切分开了。我刚刚用descriptor试验了一下,没有问题。你可以试试。

下面是我整个PyFlink读取json串进来然后解析数据中time字段的作业
def str_func(str_param):
    import json
    return json.loads(str_param)['time']

s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
s_env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
st_env = StreamTableEnvironment.create(s_env)
result_file = "/tmp/slide_row_window_streaming.csv"
if os.path.exists(result_file):
    os.remove(result_file)
st_env \
    .connect(  # declare the external system to connect to
        Kafka()
        .version("0.11")
        .topic("user")
        .start_from_earliest()
        .property("zookeeper.connect", "localhost:2181")
        .property("bootstrap.servers", "localhost:9092")
    ) \
    .with_format(  # declare a format for this system
        Csv()
            .schema(DataTypes.ROW(
            [DataTypes.FIELD("a", DataTypes.STRING())
                ]))
            .field_delimiter('\n')
     ) \
    .with_schema(  # declare the schema of the table
         Schema()
            .field("a", DataTypes.STRING())
     ) \
    .in_append_mode() \
    .register_table_source("source")

st_env.register_function(
    "str_func", udf(str_func, [DataTypes.STRING()], DataTypes.STRING()))
st_env.register_table_sink("sink",
                           CsvTableSink(["a"],
                                        [DataTypes.STRING()],
                                        result_file))

st_env.scan("source").select("str_func(a)").insert_into("sink")

kafka里面的数据
{"a": "a", "b": 1, "c": 1, "time": "2013-01-01T00:14:13Z"}
{"a": "b", "b": 2, "c": 2, "time": "2013-01-01T00:24:13Z"}
{"a": "a", "b": 3, "c": 3, "time": "2013-01-01T00:34:13Z"}
{"a": "a", "b": 4, "c": 4, "time": "2013-01-01T01:14:13Z"}
{"a": "b", "b": 4, "c": 5, "time": "2013-01-01T01:24:13Z"}
{"a": "a", "b": 5, "c": 2, "time": "2013-01-01T01:34:13Z"}

最后Csv里面的结果数据为
2013-01-01T00:14:13Z
2013-01-01T00:24:13Z
2013-01-01T00:34:13Z
2013-01-01T01:14:13Z
2013-01-01T01:24:13Z
2013-01-01T01:34:13Z

Best,
Xingbo

刘亚坤 <ws...@163.com> 于2020年5月29日周五 下午2:17写道:

> 目前在学习使用pyflink的Table api,请教一个问题:
> 1、Table Api连接kafka系统,能否把整条的kafka消息看成是一个table字段进行处理?比如,kafka
> topic连的消息为一个json字符串,把这个字符串整体当做是一个字段,这样可以方便使用 pyflink 的udf函数对消息进行处理转换等操作?
> 2、如果以上可行,连接kafka的数据格式如何设置,即with_format如何设置,目前官网这方便的资料较少。
>
> 新手入门,请多指教,感谢。
>