You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Soumen Choudhury <so...@gmail.com> on 2022/07/21 13:21:19 UTC

Flink nested complex json parsing with multiple schema file

We have a requirement of parsing a very complex json (size around 25 kb per
event) event with a predefined schema (nested schema, with multiple schema
files ) and create a temporary table and from temp table we have to apply
some case statement based some fields( eg. to find out success, failure
count , status code ) and do a aggregation in 1 sec interval.

We have tried with inbuilt *JSON_VALUE* function to retrieve some field
value and then apply the case statement, but as I am using JSON_VALUE more
than 5/6 times, the application is performing very slow.

For some other filtering use case we are able to receive more that 1600
event/sec, but for this case we are only receiving around 300 event/sec for
1 core .

Below is the query example:

*Query 1:*

"select cast(JSON_QUERY(message, '$.eventRecordHeader.Result'), bigint) AS
result1, JSON_QUERY(message, '$.eventRecordHeader.Cause.ErrorCode' ) AS
errorCode, JSON_QUERY(message, '$.eventRecordHeader.Cause.SubCause' ) AS
subCause, JSON_QUERY(message,
'$.eventRecordHeader.Cause.SubCause.SubProtocol' ) AS subProtocol,
JSON_QUERY(message, '$.eventRecordHeader.Cause.SubCause.SubError' ) AS
subError, TO_TIMESTAMP_LTZ(cast(JSON_QUERY(message,
'$.eventRecordHeader.StartTime') as bigint)/1000, 3) AS eventTime,
proctime() as proctime from kafkaJsonSource",

*Query 2:*

select count(case when result1=1 then 1 else null end)
failed_result,count(case when result1=0 then 1 else null end)
successful_result,count(case when errorCode like '4%' then 1 else null end)
err_starts_4,count(case when errorCode like '5%' then 1 else null end)
err_starts_5,count(case when errorCode like '6%' then 1 else null end)
err_starts_6,count(case when subCause is not null then 1 else null end)
has_sub_cause,count(case when subProtocol='DNS' then 1 else null end)
protocol_dns, count(case when subProtocol='Diameter' then 1 else null end)
protocol_diameter, count(case when (subProtocol='Diameter' and subError
like '3%') then 1 else null end) protocol_diameter_err_starts_3,count(case
when (subProtocol='Diameter' and subError like '4%') then 1 else null end)
protocol_diameter_err_starts_4,count(case when (subProtocol='Diameter' and
subError like '5%') then 1 else null end) protocol_diameter_err_starts_5
FROM TABLE(TUMBLE(TABLE filter_transformed, DESCRIPTOR(proctime), INTERVAL
'1' SECOND)) GROUP BY window_start, window_end;

Please someone let use know, if there is some better way to do this .

-- 
Regards
Soumen Choudhury
Cell : +91865316168
mail to : sou.tcs@gmail.com

Re: Flink nested complex json parsing with multiple schema file

Posted by Soumen Choudhury <so...@gmail.com>.
Hi Yaroslav,

Thanks for your reply.
How is performance ,

Can I have a sample code for the same.

On Thu, Jul 21, 2022, 9:31 PM Yaroslav Tkachenko <ya...@goldsky.io>
wrote:

> Hi Soumen,
>
> I'd try parsing the input using the DataStream API (with a fast JSON
> library) and then converting it to a Table.
>
> On Thu, Jul 21, 2022 at 6:22 AM Soumen Choudhury <so...@gmail.com>
> wrote:
>
>> We have a requirement of parsing a very complex json (size around 25 kb
>> per event) event with a predefined schema (nested schema, with multiple
>> schema files ) and create a temporary table and from temp table we have to
>> apply some case statement based some fields( eg. to find out success,
>> failure count , status code ) and do a aggregation in 1 sec interval.
>>
>> We have tried with inbuilt *JSON_VALUE* function to retrieve some field
>> value and then apply the case statement, but as I am using JSON_VALUE more
>> than 5/6 times, the application is performing very slow.
>>
>> For some other filtering use case we are able to receive more that 1600
>> event/sec, but for this case we are only receiving around 300 event/sec for
>> 1 core .
>>
>> Below is the query example:
>>
>> *Query 1:*
>>
>> "select cast(JSON_QUERY(message, '$.eventRecordHeader.Result'), bigint)
>> AS result1, JSON_QUERY(message, '$.eventRecordHeader.Cause.ErrorCode' )
>> AS errorCode, JSON_QUERY(message, '$.eventRecordHeader.Cause.SubCause' ) AS
>> subCause, JSON_QUERY(message,
>> '$.eventRecordHeader.Cause.SubCause.SubProtocol' ) AS subProtocol,
>> JSON_QUERY(message, '$.eventRecordHeader.Cause.SubCause.SubError' ) AS
>> subError, TO_TIMESTAMP_LTZ(cast(JSON_QUERY(message,
>> '$.eventRecordHeader.StartTime') as bigint)/1000, 3) AS eventTime,
>> proctime() as proctime from kafkaJsonSource",
>>
>> *Query 2:*
>>
>> select count(case when result1=1 then 1 else null end)
>> failed_result,count(case when result1=0 then 1 else null end)
>> successful_result,count(case when errorCode like '4%' then 1 else null end)
>> err_starts_4,count(case when errorCode like '5%' then 1 else null end)
>> err_starts_5,count(case when errorCode like '6%' then 1 else null end)
>> err_starts_6,count(case when subCause is not null then 1 else null end)
>> has_sub_cause,count(case when subProtocol='DNS' then 1 else null end)
>> protocol_dns, count(case when subProtocol='Diameter' then 1 else null end)
>> protocol_diameter, count(case when (subProtocol='Diameter' and subError
>> like '3%') then 1 else null end) protocol_diameter_err_starts_3,count(case
>> when (subProtocol='Diameter' and subError like '4%') then 1 else null end)
>> protocol_diameter_err_starts_4,count(case when (subProtocol='Diameter' and
>> subError like '5%') then 1 else null end) protocol_diameter_err_starts_5
>> FROM TABLE(TUMBLE(TABLE filter_transformed, DESCRIPTOR(proctime), INTERVAL
>> '1' SECOND)) GROUP BY window_start, window_end;
>>
>> Please someone let use know, if there is some better way to do this .
>>
>> --
>> Regards
>> Soumen Choudhury
>> Cell : +91865316168
>> mail to : sou.tcs@gmail.com
>>
>

Re: Flink nested complex json parsing with multiple schema file

Posted by Yaroslav Tkachenko <ya...@goldsky.io>.
Hi Soumen,

I'd try parsing the input using the DataStream API (with a fast JSON
library) and then converting it to a Table.

On Thu, Jul 21, 2022 at 6:22 AM Soumen Choudhury <so...@gmail.com> wrote:

> We have a requirement of parsing a very complex json (size around 25 kb
> per event) event with a predefined schema (nested schema, with multiple
> schema files ) and create a temporary table and from temp table we have to
> apply some case statement based some fields( eg. to find out success,
> failure count , status code ) and do a aggregation in 1 sec interval.
>
> We have tried with inbuilt *JSON_VALUE* function to retrieve some field
> value and then apply the case statement, but as I am using JSON_VALUE more
> than 5/6 times, the application is performing very slow.
>
> For some other filtering use case we are able to receive more that 1600
> event/sec, but for this case we are only receiving around 300 event/sec for
> 1 core .
>
> Below is the query example:
>
> *Query 1:*
>
> "select cast(JSON_QUERY(message, '$.eventRecordHeader.Result'), bigint) AS
> result1, JSON_QUERY(message, '$.eventRecordHeader.Cause.ErrorCode' ) AS
> errorCode, JSON_QUERY(message, '$.eventRecordHeader.Cause.SubCause' ) AS
> subCause, JSON_QUERY(message,
> '$.eventRecordHeader.Cause.SubCause.SubProtocol' ) AS subProtocol,
> JSON_QUERY(message, '$.eventRecordHeader.Cause.SubCause.SubError' ) AS
> subError, TO_TIMESTAMP_LTZ(cast(JSON_QUERY(message,
> '$.eventRecordHeader.StartTime') as bigint)/1000, 3) AS eventTime,
> proctime() as proctime from kafkaJsonSource",
>
> *Query 2:*
>
> select count(case when result1=1 then 1 else null end)
> failed_result,count(case when result1=0 then 1 else null end)
> successful_result,count(case when errorCode like '4%' then 1 else null end)
> err_starts_4,count(case when errorCode like '5%' then 1 else null end)
> err_starts_5,count(case when errorCode like '6%' then 1 else null end)
> err_starts_6,count(case when subCause is not null then 1 else null end)
> has_sub_cause,count(case when subProtocol='DNS' then 1 else null end)
> protocol_dns, count(case when subProtocol='Diameter' then 1 else null end)
> protocol_diameter, count(case when (subProtocol='Diameter' and subError
> like '3%') then 1 else null end) protocol_diameter_err_starts_3,count(case
> when (subProtocol='Diameter' and subError like '4%') then 1 else null end)
> protocol_diameter_err_starts_4,count(case when (subProtocol='Diameter' and
> subError like '5%') then 1 else null end) protocol_diameter_err_starts_5
> FROM TABLE(TUMBLE(TABLE filter_transformed, DESCRIPTOR(proctime), INTERVAL
> '1' SECOND)) GROUP BY window_start, window_end;
>
> Please someone let use know, if there is some better way to do this .
>
> --
> Regards
> Soumen Choudhury
> Cell : +91865316168
> mail to : sou.tcs@gmail.com
>