You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Anyang Hu <hu...@gmail.com> on 2019/10/29 09:59:51 UTC

[FlinkSQL] is there a way to read/write local json data in Flink SQL like that of kafka?

Hi guys,

In flink1.9, we can set `connector.type` to `kafka` and `format.type` to
json to read/write json data from kafka or write json data to kafka.

In my scenario, I wish to read local json data as a souce table, since I
need to do local debug and don't consume online kafka data.

For example:

> create table source (
> first varchar,
> id int
> ) with (
> 'connector.type' = 'filesystem',
> 'connector.path' = '/path/to/json',
> 'format.type' = 'json'
> )


In addition, writing local json data is also needed.

Does anyone have similar needs?

Best regards,
Anyang

Re: [FlinkSQL] is there a way to read/write local json data in Flink SQL like that of kafka?

Posted by Jingsong Li <ji...@gmail.com>.
Hi anyang:

For you information. I plan to support JSON format in file system connector
after https://issues.apache.org/jira/browse/FLINK-14256
After FLIP-66[1], we can define time attribute in SQL DDL whatever
connector is.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+Time+Attribute+in+SQL+DDL

On Wed, Oct 30, 2019 at 11:36 AM Jingsong Li <ji...@gmail.com> wrote:

> Hi anyang:
>
> For you information. I plan to support JSON format in file system
> connector after https://issues.apache.org/jira/browse/FLINK-14256
> After FLIP-66[1], we can define time attribute in SQL DDL whatever
> connector is.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+Time+Attribute+in+SQL+DDL
>
>
> On Tue, Oct 29, 2019 at 10:01 PM Anyang Hu <hu...@gmail.com> wrote:
>
>> Hi,
>>
>> Thanks Dawid and Florin.
>>
>> To Dawid:
>>
>> CsvTableSource doesn't implements DefinedProctimeAttribute and DefinedRowtimeAttributes interfaces, so we can not use proctime and rowtime in source ddl. Except csv, we also need to consume json and pb data.
>>
>>
>> To Florin:
>> Installing local kafka and zk introduces too many third-party
>> components and may be not universal.
>>
>> In my scenario, I need to run a local sql job to debug(for example source
>> and sink are kafka-json, dimension table is jdbc) before submit  it to
>> yarn. The following usage is what I want:
>> 1)generate local json data for source and dimension table (source table
>> supports proctime and rowtime);
>> 2)  replace `connetor.type` to 'filesystem';
>> 3)  add `connector.path`  to source table /dimension table ddl property;
>> 4)  new sql can run locally as data read from kafka and jdbc.
>>
>> Thanks,
>> Anyang
>>
>> Spico Florin <sp...@gmail.com> 于2019年10月29日周二 下午6:35写道:
>>
>>> Hi!
>>>
>>>  Another solution would be to locally install kafka+zookeeper and push
>>> your dumped json (from the production server) data in a topic(you create a
>>> Kafka producer).
>>>  Then you configure your code to point to this local broker. Consume
>>> your data from topic from either strategy you need (earliest offset,
>>> latest).
>>> The advantage is that you can repeat your tests multiple times as in
>>> real scenario.
>>>
>>> Depending on your use case, there can be different behaviour of your
>>> processing pipeline when you consume from a file (batch) or from a stream
>>> (kafka).
>>> I had this kind of issue when some CEP functionalities.
>>> I hope it helps.
>>>  Regards,
>>>  Florin
>>>
>>>
>>> On Tue, Oct 29, 2019 at 12:00 PM Anyang Hu <hu...@gmail.com>
>>> wrote:
>>>
>>>> Hi guys,
>>>>
>>>> In flink1.9, we can set `connector.type` to `kafka` and `format.type`
>>>> to json to read/write json data from kafka or write json data to kafka.
>>>>
>>>> In my scenario, I wish to read local json data as a souce table, since
>>>> I need to do local debug and don't consume online kafka data.
>>>>
>>>> For example:
>>>>
>>>>> create table source (
>>>>> first varchar,
>>>>> id int
>>>>> ) with (
>>>>> 'connector.type' = 'filesystem',
>>>>> 'connector.path' = '/path/to/json',
>>>>> 'format.type' = 'json'
>>>>> )
>>>>
>>>>
>>>> In addition, writing local json data is also needed.
>>>>
>>>> Does anyone have similar needs?
>>>>
>>>> Best regards,
>>>> Anyang
>>>>
>>>
>
> --
> Best, Jingsong Lee
>


-- 
Best, Jingsong Lee

Re: [FlinkSQL] is there a way to read/write local json data in Flink SQL like that of kafka?

Posted by Anyang Hu <hu...@gmail.com>.
Hi,

Thanks Dawid and Florin.

To Dawid:

CsvTableSource doesn't implements DefinedProctimeAttribute and
DefinedRowtimeAttributes interfaces, so we can not use proctime and
rowtime in source ddl. Except csv, we also need to consume json and pb
data.


To Florin:
Installing local kafka and zk introduces too many third-party
components and may be not universal.

In my scenario, I need to run a local sql job to debug(for example source
and sink are kafka-json, dimension table is jdbc) before submit  it to
yarn. The following usage is what I want:
1)generate local json data for source and dimension table (source table
supports proctime and rowtime);
2)  replace `connetor.type` to 'filesystem';
3)  add `connector.path`  to source table /dimension table ddl property;
4)  new sql can run locally as data read from kafka and jdbc.

Thanks,
Anyang

Spico Florin <sp...@gmail.com> 于2019年10月29日周二 下午6:35写道:

> Hi!
>
>  Another solution would be to locally install kafka+zookeeper and push
> your dumped json (from the production server) data in a topic(you create a
> Kafka producer).
>  Then you configure your code to point to this local broker. Consume your
> data from topic from either strategy you need (earliest offset, latest).
> The advantage is that you can repeat your tests multiple times as in real
> scenario.
>
> Depending on your use case, there can be different behaviour of your
> processing pipeline when you consume from a file (batch) or from a stream
> (kafka).
> I had this kind of issue when some CEP functionalities.
> I hope it helps.
>  Regards,
>  Florin
>
>
> On Tue, Oct 29, 2019 at 12:00 PM Anyang Hu <hu...@gmail.com> wrote:
>
>> Hi guys,
>>
>> In flink1.9, we can set `connector.type` to `kafka` and `format.type` to
>> json to read/write json data from kafka or write json data to kafka.
>>
>> In my scenario, I wish to read local json data as a souce table, since I
>> need to do local debug and don't consume online kafka data.
>>
>> For example:
>>
>>> create table source (
>>> first varchar,
>>> id int
>>> ) with (
>>> 'connector.type' = 'filesystem',
>>> 'connector.path' = '/path/to/json',
>>> 'format.type' = 'json'
>>> )
>>
>>
>> In addition, writing local json data is also needed.
>>
>> Does anyone have similar needs?
>>
>> Best regards,
>> Anyang
>>
>

Re: [FlinkSQL] is there a way to read/write local json data in Flink SQL like that of kafka?

Posted by Spico Florin <sp...@gmail.com>.
Hi!

 Another solution would be to locally install kafka+zookeeper and push your
dumped json (from the production server) data in a topic(you create a Kafka
producer).
 Then you configure your code to point to this local broker. Consume your
data from topic from either strategy you need (earliest offset, latest).
The advantage is that you can repeat your tests multiple times as in real
scenario.

Depending on your use case, there can be different behaviour of your
processing pipeline when you consume from a file (batch) or from a stream
(kafka).
I had this kind of issue when some CEP functionalities.
I hope it helps.
 Regards,
 Florin


On Tue, Oct 29, 2019 at 12:00 PM Anyang Hu <hu...@gmail.com> wrote:

> Hi guys,
>
> In flink1.9, we can set `connector.type` to `kafka` and `format.type` to
> json to read/write json data from kafka or write json data to kafka.
>
> In my scenario, I wish to read local json data as a souce table, since I
> need to do local debug and don't consume online kafka data.
>
> For example:
>
>> create table source (
>> first varchar,
>> id int
>> ) with (
>> 'connector.type' = 'filesystem',
>> 'connector.path' = '/path/to/json',
>> 'format.type' = 'json'
>> )
>
>
> In addition, writing local json data is also needed.
>
> Does anyone have similar needs?
>
> Best regards,
> Anyang
>

Re: [FlinkSQL] is there a way to read/write local json data in Flink SQL like that of kafka?

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi,

Unfortunately it is not possible out of the box. The only format that
the filesystem connector supports as of now is CSV.

As a workaround you could create a Table out of a DataStream reusing the
JsonRowDeserializationSchema. Have a look at the example below:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        DataStream<byte[]> input = env.fromElements(
            "{\"lon\": 123.23, \"rideTime\": \"2019\", \"obj\": {\"numb\": 1234}}".getBytes()
        ); // or read from file record by record

        JsonRowDeserializationSchema jsonSchema = new JsonRowDeserializationSchema.Builder(...).build();

        TypeInformation<Row> producedType = jsonSchema.getProducedType();
        SingleOutputStreamOperator<Row> in = input.map(jsonSchema::deserialize)
            .returns(producedType);

        tEnv.registerDataStream("t", in);

	Table table = tEnv.sqlQuery("SELECT * FROM t");

Best,

Dawid

On 29/10/2019 10:59, Anyang Hu wrote:
> Hi guys,
>
> In flink1.9, we can set `connector.type` to `kafka` and `format.type`
> to json to read/write json data from kafka or write json data to kafka.
>
> In my scenario, I wish to read local json data as a souce table, since
> I need to do local debug and don't consume online kafka data.
>
> For example:
>
>     create table source (
>     first varchar,
>     id int
>     ) with (
>     'connector.type' = 'filesystem',
>     'connector.path' = '/path/to/json',
>     'format.type' = 'json'
>     )
>
>
> In addition, writing local json data is also needed.
>
> Does anyone have similar needs?
>
> Best regards,
> Anyang