You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "ivan.rosero@agilent.com" <iv...@agilent.com> on 2022/06/10 11:40:13 UTC

custom table source, how to support json?

Hello,

I have a flink table source working using

"""
    create table source (
        ts TIMESTAMP(3),
        log_line STRING,
        WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
    ) with (
        'connector'='lokitail', 'query'='blah', 'url'='blah'
    )
""")

It uses a simple custom table source, which collects rows like this:

            ctx.collect(GenericRowData.of(
                TimestampData.fromEpochMillis(Instant.now().toEpochMilli()),
                StringData.fromString("field0_counter_" + count++))
            )

I would like, instead, to just send a single JSON string, like:

            ctx.collect(GenericRowData.of(
                StringData.fromString("{\"value\" : \"field0_counter_" + count++ + "\", \"ts\":\"" + Instant.now().toEpochMilli() + "\"}"))
            );

And handle parsing in python flink.  Can this be done simply at the point of collecting the row data?

Thank you,

Ivan

Re: custom table source, how to support json?

Posted by Dian Fu <di...@gmail.com>.
Hi Ivan,

Is your question how to parse the JSON string in PyFlink? If so, maybe you
could take a look at this [1].

Regards,
Dian

[1]
https://stackoverflow.com/questions/71820015/how-to-reference-nested-json-within-pyflink-sql-when-json-schema-varies

On Fri, Jun 10, 2022 at 7:40 PM ivan.rosero@agilent.com <
ivan.rosero@agilent.com> wrote:

> Hello,
>
>
>
> I have a flink table source working using
>
>
>
> """
>
>     create table source (
>
>         ts TIMESTAMP(3),
>
>         log_line STRING,
>
>         WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
>
>     ) with (
>
>         'connector'='lokitail', 'query'='blah', 'url'='blah'
>
>     )
>
> """)
>
>
>
> It uses a simple custom table source, which collects rows like this:
>
>
>
>             ctx.collect(GenericRowData.of(
>
>
> TimestampData.fromEpochMillis(Instant.now().toEpochMilli()),
>
>                 StringData.fromString("field0_counter_" + count++))
>
>             )
>
>
>
> I would like, instead, to just send a single JSON string, like:
>
>
>
>             ctx.collect(GenericRowData.of(
>
>                 StringData.fromString("{\"value\" : \"field0_counter_" +
> count++ + "\", \"ts\":\"" + Instant.now().toEpochMilli() + "\"}"))
>
>             );
>
>
>
> And handle parsing in python flink.  Can this be done simply at the point
> of collecting the row data?
>
>
>
> Thank you,
>
>
> Ivan
>