You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "ivan.rosero@agilent.com" <iv...@agilent.com> on 2022/06/10 11:40:13 UTC
custom table source, how to support json?
Hello,
I have a flink table source working using
"""
create table source (
ts TIMESTAMP(3),
log_line STRING,
WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
) with (
'connector'='lokitail', 'query'='blah', 'url'='blah'
)
""")
It uses a simple custom table source, which collects rows like this:
ctx.collect(GenericRowData.of(
TimestampData.fromEpochMillis(Instant.now().toEpochMilli()),
StringData.fromString("field0_counter_" + count++))
)
I would like, instead, to just send a single JSON string, like:
ctx.collect(GenericRowData.of(
StringData.fromString("{\"value\" : \"field0_counter_" + count++ + "\", \"ts\":\"" + Instant.now().toEpochMilli() + "\"}"))
);
And handle parsing in python flink. Can this be done simply at the point of collecting the row data?
Thank you,
Ivan
Re: custom table source, how to support json?
Posted by Dian Fu <di...@gmail.com>.
Hi Ivan,
Is your question how to parse the JSON string in PyFlink? If so, maybe you
could take a look at this [1].
Regards,
Dian
[1]
https://stackoverflow.com/questions/71820015/how-to-reference-nested-json-within-pyflink-sql-when-json-schema-varies
On Fri, Jun 10, 2022 at 7:40 PM ivan.rosero@agilent.com <
ivan.rosero@agilent.com> wrote:
> Hello,
>
>
>
> I have a flink table source working using
>
>
>
> """
>
> create table source (
>
> ts TIMESTAMP(3),
>
> log_line STRING,
>
> WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
>
> ) with (
>
> 'connector'='lokitail', 'query'='blah', 'url'='blah'
>
> )
>
> """)
>
>
>
> It uses a simple custom table source, which collects rows like this:
>
>
>
> ctx.collect(GenericRowData.of(
>
>
> TimestampData.fromEpochMillis(Instant.now().toEpochMilli()),
>
> StringData.fromString("field0_counter_" + count++))
>
> )
>
>
>
> I would like, instead, to just send a single JSON string, like:
>
>
>
> ctx.collect(GenericRowData.of(
>
> StringData.fromString("{\"value\" : \"field0_counter_" +
> count++ + "\", \"ts\":\"" + Instant.now().toEpochMilli() + "\"}"))
>
> );
>
>
>
> And handle parsing in python flink. Can this be done simply at the point
> of collecting the row data?
>
>
>
> Thank you,
>
>
> Ivan
>