You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lehuede sebastien <le...@gmail.com> on 2018/04/23 13:03:09 UTC

KafkaJsonTableSource purpose

Hi Guys,

I'm actually trying to understand the purpose of Table and in particular
KafkaJsonTableSource. I try to see if for my use case ths can be usefull.

Here is my context :

I send logs on logstash, i add some information (Type, Tags), Logstash send
logs to Kafka in JSON format and finally i use Flink-Connector-Kafka to
read from Kafka and parse the logs.


Before any processing events from Kafka to Flink look like this :

*{"message":"accept service_id: domain-udp; src: 1.1.1.1; dst: 2.2.2.2;
proto: udp; product: VPN-1 & FireWall-1; service: 53; s_port:
32769","@timestamp":"2018-04-20T14:47:35.285Z","host":"FW"}*

Then i use "JSONDeserializationSchema" to deserialize events :

*FlinkKafkaConsumer011<ObjectNode> kafkaConsumer = new
FlinkKafkaConsumer011<>("Firewall",new
JSONDeserializationSchema(),properties);*

I take the value of the key "message" :

*public String map(ObjectNode value) throws Exception {*
*                                String message =
value.get("message").asText();*

Then parse it with Java Regex and put each match group in a String/Int/... :

action : accept
service_id : doamin-udp
src_ip : 1.1.1.1
dst_ip : 2.2.2.2
.....

Now i want to replace "message" key by "rawMessage" and put each match
group in JSON object to obain the final result :

*{"rawMessage":"accept service_id: domain-udp; src: 1.1.1.1; dst: 2.2.2.2;
proto: udp; product: VPN-1 & FireWall-1; service: 53; s_port: 32769",*
*"@timestamp":"2018-04-20T14:47:35.285Z",*
*"host":"FW",*
*"type":"firewall",*
*"tags":["Checkpoint"],*
*"action":"accept",*
*"service_id":"domain-udp",*
*"src_ip":"1.1.1.1",*
*"dst_ip":"2.2.2.2",*
*...}*

I'm a newbie with Streaming Application technologies, with Flink, and for
the moment i still discover how it works and what are the different
fonctionnalities. But when i was looking for a solution to obtain my final
result, i came across KafkaJsonTableSource.

Does anyone think this can be a good solution for my use case ?

I think i will be able to store JSON from Kafka, process data then modify
the table and send data to another Kafka, is it correct ?

Regards,
Sebastien

Re: KafkaJsonTableSource purpose

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Sebastien,

I think you can do that with Flink's Table API / SQL and the
KafkaJsonTableSource.
Note that in Flink 1.4.2, the KafkaJsonTableSource does not support flat
JSON yet.
You'd also need a table-valued UDFs for the parsing of the message and
joining the result with the original row. Depending on what you want to do,
you might need additional UDFs.

Best,
Fabian

2018-04-24 8:48 GMT+02:00 miki haiat <mi...@gmail.com>:

> HI ,
> Assuming that your looking for streaming   use case , i think this is a
> better approach
>
>    1. Send Avro from logstash  ,better performance.
>    2. Deserialize it to POJO .
>    3. Do logic...
>
>
>
>
> On Mon, Apr 23, 2018 at 4:03 PM, Lehuede sebastien <le...@gmail.com>
> wrote:
>
>> Hi Guys,
>>
>> I'm actually trying to understand the purpose of Table and in particular
>> KafkaJsonTableSource. I try to see if for my use case ths can be usefull.
>>
>> Here is my context :
>>
>> I send logs on logstash, i add some information (Type, Tags), Logstash
>> send logs to Kafka in JSON format and finally i use Flink-Connector-Kafka
>> to read from Kafka and parse the logs.
>>
>>
>> Before any processing events from Kafka to Flink look like this :
>>
>> *{"message":"accept service_id: domain-udp; src: 1.1.1.1; dst: 2.2.2.2;
>> proto: udp; product: VPN-1 & FireWall-1; service: 53; s_port:
>> 32769","@timestamp":"2018-04-20T14:47:35.285Z","host":"FW"}*
>>
>> Then i use "JSONDeserializationSchema" to deserialize events :
>>
>> *FlinkKafkaConsumer011<ObjectNode> kafkaConsumer = new
>> FlinkKafkaConsumer011<>("Firewall",new
>> JSONDeserializationSchema(),properties);*
>>
>> I take the value of the key "message" :
>>
>> *public String map(ObjectNode value) throws Exception {*
>> *                                String message =
>> value.get("message").asText();*
>>
>> Then parse it with Java Regex and put each match group in a
>> String/Int/... :
>>
>> action : accept
>> service_id : doamin-udp
>> src_ip : 1.1.1.1
>> dst_ip : 2.2.2.2
>> .....
>>
>> Now i want to replace "message" key by "rawMessage" and put each match
>> group in JSON object to obain the final result :
>>
>> *{"rawMessage":"accept service_id: domain-udp; src: 1.1.1.1; dst:
>> 2.2.2.2; proto: udp; product: VPN-1 & FireWall-1; service: 53; s_port:
>> 32769",*
>> *"@timestamp":"2018-04-20T14:47:35.285Z",*
>> *"host":"FW",*
>> *"type":"firewall",*
>> *"tags":["Checkpoint"],*
>> *"action":"accept",*
>> *"service_id":"domain-udp",*
>> *"src_ip":"1.1.1.1",*
>> *"dst_ip":"2.2.2.2",*
>> *...}*
>>
>> I'm a newbie with Streaming Application technologies, with Flink, and for
>> the moment i still discover how it works and what are the different
>> fonctionnalities. But when i was looking for a solution to obtain my final
>> result, i came across KafkaJsonTableSource.
>>
>> Does anyone think this can be a good solution for my use case ?
>>
>> I think i will be able to store JSON from Kafka, process data then modify
>> the table and send data to another Kafka, is it correct ?
>>
>> Regards,
>> Sebastien
>>
>>
>>
>

Re: KafkaJsonTableSource purpose

Posted by miki haiat <mi...@gmail.com>.
HI ,
Assuming that your looking for streaming   use case , i think this is a
better approach

   1. Send Avro from logstash  ,better performance.
   2. Deserialize it to POJO .
   3. Do logic...




On Mon, Apr 23, 2018 at 4:03 PM, Lehuede sebastien <le...@gmail.com>
wrote:

> Hi Guys,
>
> I'm actually trying to understand the purpose of Table and in particular
> KafkaJsonTableSource. I try to see if for my use case ths can be usefull.
>
> Here is my context :
>
> I send logs on logstash, i add some information (Type, Tags), Logstash
> send logs to Kafka in JSON format and finally i use Flink-Connector-Kafka
> to read from Kafka and parse the logs.
>
>
> Before any processing events from Kafka to Flink look like this :
>
> *{"message":"accept service_id: domain-udp; src: 1.1.1.1; dst: 2.2.2.2;
> proto: udp; product: VPN-1 & FireWall-1; service: 53; s_port:
> 32769","@timestamp":"2018-04-20T14:47:35.285Z","host":"FW"}*
>
> Then i use "JSONDeserializationSchema" to deserialize events :
>
> *FlinkKafkaConsumer011<ObjectNode> kafkaConsumer = new
> FlinkKafkaConsumer011<>("Firewall",new
> JSONDeserializationSchema(),properties);*
>
> I take the value of the key "message" :
>
> *public String map(ObjectNode value) throws Exception {*
> *                                String message =
> value.get("message").asText();*
>
> Then parse it with Java Regex and put each match group in a String/Int/...
> :
>
> action : accept
> service_id : doamin-udp
> src_ip : 1.1.1.1
> dst_ip : 2.2.2.2
> .....
>
> Now i want to replace "message" key by "rawMessage" and put each match
> group in JSON object to obain the final result :
>
> *{"rawMessage":"accept service_id: domain-udp; src: 1.1.1.1; dst: 2.2.2.2;
> proto: udp; product: VPN-1 & FireWall-1; service: 53; s_port: 32769",*
> *"@timestamp":"2018-04-20T14:47:35.285Z",*
> *"host":"FW",*
> *"type":"firewall",*
> *"tags":["Checkpoint"],*
> *"action":"accept",*
> *"service_id":"domain-udp",*
> *"src_ip":"1.1.1.1",*
> *"dst_ip":"2.2.2.2",*
> *...}*
>
> I'm a newbie with Streaming Application technologies, with Flink, and for
> the moment i still discover how it works and what are the different
> fonctionnalities. But when i was looking for a solution to obtain my final
> result, i came across KafkaJsonTableSource.
>
> Does anyone think this can be a good solution for my use case ?
>
> I think i will be able to store JSON from Kafka, process data then modify
> the table and send data to another Kafka, is it correct ?
>
> Regards,
> Sebastien
>
>
>