You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by Lokesh Lingarajan <ll...@confluent.io.INVALID> on 2021/08/30 21:43:51 UTC

[Proposal] - Kafka Input Format for headers, key and payload parsing

Motivation

Today we ingest a number of high cardinality metrics into Druid across
dimensions. These metrics are rolled up on a per minute basis, and are very
useful when looking at metrics on a partition or client basis. Events is
another class of data that provides useful information about a particular
incident/scenario inside a Kafka cluster. Events themselves are carried
inside the kafka payload, but nonetheless there is some very useful
metadata that is carried in kafka headers that can serve as a useful
dimension for aggregation and in turn bringing better insights.

PR(#10730 <https://github.com/apache/druid/pull/10730>) introduced support
for Kafka headers in InputFormats.

We still need an input format to parse out the headers and translate those
into relevant columns in Druid. Until that’s implemented, none of the
information available in the Kafka message headers would be exposed. So
first there is a need to implement an input format that can parse headers
in any given format(provided we support the format) like we parse payloads
today. Apart from headers there is also some useful information present in
the key portion of the kafka record. We also need a way to expose the data
present in the key as druid columns. We need a generic way to express at
configuration time what attributes from headers, key and payload need to be
ingested into druid. We need to keep the design generic enough so that
users can specify different parsers for headers, key and payload.

Proposal is to design an input format to solve the above by providing
wrapper around any existing input formats and merging the data into a
single unified Druid row.
Proposed changes

Let's look at a sample input format from the above discussion






















*"inputFormat":{        "type": "kafka", // New input format type
"headerLabelPrefix": "kafka.header.", // Label prefix for header columns,
this will avoid collisions while merging columns
"recordTimestampLabelPrefix": "kafka.", // Kafka record's timestamp is made
available in case payload does not carry timestamp        "headerFormat":
// Header parser specifying that values are of type string        {
      "type": "string"        },       "valueFormat": // Value parser from
json parsing       {             "type": "json",             "flattenSpec":
{                     "useFieldDiscovery": true,
"fields": [...]             }        },        "keyFormat": // Key parser
also from json parsing         {             "type": "json"         }}*

Since we have independent sections for header, key and payload, it will
also enable parsing each section with its own parser, eg., headers coming
in as string and payload as json.

KafkaInputFormat(the new inputFormat class) will be the uber class
extending inputFormat interface and will be responsible for creating
individual parsers for header, key and payload, blend the data resolving
conflicts in columns and generating a single unified InputRow for Druid
ingestion.

"headerFormat" will allow users to plug in a parser type for the header
values and will add the default header prefix as "kafka.header."(can be
overridden) for attributes to avoid collision while merging attributes with
payload.

Kafka payload parser will be responsible for parsing the Value portion of
the Kafka record. This is where most of the data will come from and we
should be able to plugin existing parsers. One thing to note here is that
if batching is performed, then the code should be augmenting header and key
values to every record in the batch.

Kafka key parser will handle parsing the Key portion of the Kafka record
and will ingest the Key with dimension name as "kafka.key".
Operational impact, Test plan & Future work

Since we had an immediate need to ingest blended data from header and
payload, we have implemented the above proposal in a PR - here
<https://github.com/apache/druid/pull/11630>
-Lokesh Lingarajan

Re: [Proposal] - Kafka Input Format for headers, key and payload parsing

Posted by Gian Merlino <gi...@apache.org>.
Hey Lokesh,

Thanks for the details. To me it makes more sense to have the user specify
the entire timestamp and key field name (it seems weird to have a
"timestamp prefix" and "key prefix" that are only used for single fields).
I just wrote that + a few comments on the PR itself:
https://github.com/apache/druid/pull/11630#pullrequestreview-760351816

On Fri, Sep 17, 2021 at 9:43 AM Lokesh Lingarajan <ll...@confluent.io>
wrote:

> Hi Gian,
>
> Thanks for the your reply, please find below are my comments
>
> 1) How is the timestamp exposed exactly? I see there is a
> recordTimestampLabelPrefix, but what is that a prefix to? Also: what do you
> think about accepting the entire name of the timestamp field instead?
> Finally: in the docs it would be good to have an example of how people can
> write a timestampSpec that refers to the Kafka timestamp, and also how they
> can load the Kafka timestamp as a long-typed dimension storing millis since
> the epoch (our convention for secondary timestamps).
>
> >>> The input format allows users to pick and choose the timestamp value
> either from the header/key/value portions of the kafka record. If the
> timestamp is missing in both key and value parts, then users can always
> default to the timestamp that is available in the header. Code will default
> this column with the name "kafka.timestamp". recordTimestampLabelPrefix allows
> users to change the "kafka" to something else. If this model is deviating
> from what we currently have in druid, then I agree we should change this to
> giving a full name.  Currently timestamp is loaded directly from
> ConsumerRecord<K, V> data structure as follows
>
> // Add kafka record timestamp to the mergelist, we will skip record timestamp if the same key exists already in the header list
> mergeMap.putIfAbsent(recordTimestampColumn, record.getRecord().timestamp());
>
>
> 2) You mention that the key will show up as "kafka.key", and in the
> example you provide I don't see a parameter enabling a choice of what that
> field is called. Is it hard-coded or is it configurable somehow?
>
> >>> this behavior is exactly the same as the timestamp discussed above. If
> nothing is done, we will have a column named "kafka.key", users have the
> choice to change the "kafka" to something else. We can make the change
> uniform here as well based on the above decision.
>
> 3) Could you write up some user-facing docs too, like an addition to
> development/extensions-core/kafka-ingestion.md? That way, people will know
> how to use this feature. And it'll help us better understand how it's
> supposed to work. (Perhaps it could have answered the two questions above)
>
> >>> Absolutely agree with you, I will do that along with other review
> comments from the code.
>
> Thanks again for looking into this :)
>
> -Lokesh
>
>
> On Thu, Sep 16, 2021 at 9:34 AM Gian Merlino <gi...@apache.org> wrote:
>
>> Lokesh, it looks like you got dropped from the thread, so I'm adding you
>> back. Please check out the previous message for some comments.
>>
>> By the way, by default, replies to the dev list go back to the dev list
>> only, which can cause you to miss some replies. If you join the list you
>> will be sure to get all your replies đŸ™‚
>>
>> On Tue, Sep 14, 2021 at 10:10 PM Gian Merlino <gi...@apache.org> wrote:
>>
>>> Hey Lokesh,
>>>
>>> The concept and API looks solid to me! Thank you for writing this up. I
>>> agree with Ben's comment. This will be really useful functionality.
>>>
>>> I have a few questions about how it would work:
>>>
>>> 1) How is the timestamp exposed exactly? I see there is a
>>> recordTimestampLabelPrefix, but what is that a prefix to? Also: what do you
>>> think about accepting the entire name of the timestamp field instead?
>>> Finally: in the docs it would be good to have an example of how people can
>>> write a timestampSpec that refers to the Kafka timestamp, and also how they
>>> can load the Kafka timestamp as a long-typed dimension storing millis since
>>> the epoch (our convention for secondary timestamps).
>>>
>>> 2) You mention that the key will show up as "kafka.key", and in the
>>> example you provide I don't see a parameter enabling a choice of what that
>>> field is called. Is it hard-coded or is it configurable somehow?
>>>
>>> 3) Could you write up some user-facing docs too, like an addition to
>>> development/extensions-core/kafka-ingestion.md? That way, people will know
>>> how to use this feature. And it'll help us better understand how it's
>>> supposed to work. (Perhaps it could have answered the two questions above)
>>>
>>> Full disclosure: I haven't reviewed the patch yet; these questions are
>>> just based on your writeup.
>>>
>>> On Mon, Aug 30, 2021 at 3:00 PM Lokesh Lingarajan
>>> <ll...@confluent.io.invalid> wrote:
>>>
>>>> Motivation
>>>>
>>>> Today we ingest a number of high cardinality metrics into Druid across
>>>> dimensions. These metrics are rolled up on a per minute basis, and are
>>>> very
>>>> useful when looking at metrics on a partition or client basis. Events is
>>>> another class of data that provides useful information about a
>>>> particular
>>>> incident/scenario inside a Kafka cluster. Events themselves are carried
>>>> inside the kafka payload, but nonetheless there is some very useful
>>>> metadata that is carried in kafka headers that can serve as a useful
>>>> dimension for aggregation and in turn bringing better insights.
>>>>
>>>> PR(#10730 <https://github.com/apache/druid/pull/10730>) introduced
>>>> support
>>>> for Kafka headers in InputFormats.
>>>>
>>>> We still need an input format to parse out the headers and translate
>>>> those
>>>> into relevant columns in Druid. Until that’s implemented, none of the
>>>> information available in the Kafka message headers would be exposed. So
>>>> first there is a need to implement an input format that can parse
>>>> headers
>>>> in any given format(provided we support the format) like we parse
>>>> payloads
>>>> today. Apart from headers there is also some useful information present
>>>> in
>>>> the key portion of the kafka record. We also need a way to expose the
>>>> data
>>>> present in the key as druid columns. We need a generic way to express at
>>>> configuration time what attributes from headers, key and payload need
>>>> to be
>>>> ingested into druid. We need to keep the design generic enough so that
>>>> users can specify different parsers for headers, key and payload.
>>>>
>>>> Proposal is to design an input format to solve the above by providing
>>>> wrapper around any existing input formats and merging the data into a
>>>> single unified Druid row.
>>>> Proposed changes
>>>>
>>>> Let's look at a sample input format from the above discussion
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *"inputFormat":{        "type": "kafka", // New input format type
>>>> "headerLabelPrefix": "kafka.header.", // Label prefix for header
>>>> columns,
>>>> this will avoid collisions while merging columns
>>>> "recordTimestampLabelPrefix": "kafka.", // Kafka record's timestamp is
>>>> made
>>>> available in case payload does not carry timestamp
>>>> "headerFormat":
>>>> // Header parser specifying that values are of type string        {
>>>>       "type": "string"        },       "valueFormat": // Value parser
>>>> from
>>>> json parsing       {             "type": "json",
>>>>  "flattenSpec":
>>>> {                     "useFieldDiscovery": true,
>>>> "fields": [...]             }        },        "keyFormat": // Key
>>>> parser
>>>> also from json parsing         {             "type": "json"         }}*
>>>>
>>>> Since we have independent sections for header, key and payload, it will
>>>> also enable parsing each section with its own parser, eg., headers
>>>> coming
>>>> in as string and payload as json.
>>>>
>>>> KafkaInputFormat(the new inputFormat class) will be the uber class
>>>> extending inputFormat interface and will be responsible for creating
>>>> individual parsers for header, key and payload, blend the data resolving
>>>> conflicts in columns and generating a single unified InputRow for Druid
>>>> ingestion.
>>>>
>>>> "headerFormat" will allow users to plug in a parser type for the header
>>>> values and will add the default header prefix as "kafka.header."(can be
>>>> overridden) for attributes to avoid collision while merging attributes
>>>> with
>>>> payload.
>>>>
>>>> Kafka payload parser will be responsible for parsing the Value portion
>>>> of
>>>> the Kafka record. This is where most of the data will come from and we
>>>> should be able to plugin existing parsers. One thing to note here is
>>>> that
>>>> if batching is performed, then the code should be augmenting header and
>>>> key
>>>> values to every record in the batch.
>>>>
>>>> Kafka key parser will handle parsing the Key portion of the Kafka record
>>>> and will ingest the Key with dimension name as "kafka.key".
>>>> Operational impact, Test plan & Future work
>>>>
>>>> Since we had an immediate need to ingest blended data from header and
>>>> payload, we have implemented the above proposal in a PR - here
>>>> <https://github.com/apache/druid/pull/11630>
>>>> -Lokesh Lingarajan
>>>>
>>>

Re: [Proposal] - Kafka Input Format for headers, key and payload parsing

Posted by Lokesh Lingarajan <ll...@confluent.io.INVALID>.
Hi Gian,

Thanks for the your reply, please find below are my comments

1) How is the timestamp exposed exactly? I see there is a
recordTimestampLabelPrefix, but what is that a prefix to? Also: what do you
think about accepting the entire name of the timestamp field instead?
Finally: in the docs it would be good to have an example of how people can
write a timestampSpec that refers to the Kafka timestamp, and also how they
can load the Kafka timestamp as a long-typed dimension storing millis since
the epoch (our convention for secondary timestamps).

>>> The input format allows users to pick and choose the timestamp value
either from the header/key/value portions of the kafka record. If the
timestamp is missing in both key and value parts, then users can always
default to the timestamp that is available in the header. Code will default
this column with the name "kafka.timestamp". recordTimestampLabelPrefix allows
users to change the "kafka" to something else. If this model is deviating
from what we currently have in druid, then I agree we should change this to
giving a full name.  Currently timestamp is loaded directly from
ConsumerRecord<K, V> data structure as follows

// Add kafka record timestamp to the mergelist, we will skip record
timestamp if the same key exists already in the header list
mergeMap.putIfAbsent(recordTimestampColumn, record.getRecord().timestamp());


2) You mention that the key will show up as "kafka.key", and in the example
you provide I don't see a parameter enabling a choice of what that field is
called. Is it hard-coded or is it configurable somehow?

>>> this behavior is exactly the same as the timestamp discussed above. If
nothing is done, we will have a column named "kafka.key", users have the
choice to change the "kafka" to something else. We can make the change
uniform here as well based on the above decision.

3) Could you write up some user-facing docs too, like an addition to
development/extensions-core/kafka-ingestion.md? That way, people will know
how to use this feature. And it'll help us better understand how it's
supposed to work. (Perhaps it could have answered the two questions above)

>>> Absolutely agree with you, I will do that along with other review
comments from the code.

Thanks again for looking into this :)

-Lokesh


On Thu, Sep 16, 2021 at 9:34 AM Gian Merlino <gi...@apache.org> wrote:

> Lokesh, it looks like you got dropped from the thread, so I'm adding you
> back. Please check out the previous message for some comments.
>
> By the way, by default, replies to the dev list go back to the dev list
> only, which can cause you to miss some replies. If you join the list you
> will be sure to get all your replies đŸ™‚
>
> On Tue, Sep 14, 2021 at 10:10 PM Gian Merlino <gi...@apache.org> wrote:
>
>> Hey Lokesh,
>>
>> The concept and API looks solid to me! Thank you for writing this up. I
>> agree with Ben's comment. This will be really useful functionality.
>>
>> I have a few questions about how it would work:
>>
>> 1) How is the timestamp exposed exactly? I see there is a
>> recordTimestampLabelPrefix, but what is that a prefix to? Also: what do you
>> think about accepting the entire name of the timestamp field instead?
>> Finally: in the docs it would be good to have an example of how people can
>> write a timestampSpec that refers to the Kafka timestamp, and also how they
>> can load the Kafka timestamp as a long-typed dimension storing millis since
>> the epoch (our convention for secondary timestamps).
>>
>> 2) You mention that the key will show up as "kafka.key", and in the
>> example you provide I don't see a parameter enabling a choice of what that
>> field is called. Is it hard-coded or is it configurable somehow?
>>
>> 3) Could you write up some user-facing docs too, like an addition to
>> development/extensions-core/kafka-ingestion.md? That way, people will know
>> how to use this feature. And it'll help us better understand how it's
>> supposed to work. (Perhaps it could have answered the two questions above)
>>
>> Full disclosure: I haven't reviewed the patch yet; these questions are
>> just based on your writeup.
>>
>> On Mon, Aug 30, 2021 at 3:00 PM Lokesh Lingarajan
>> <ll...@confluent.io.invalid> wrote:
>>
>>> Motivation
>>>
>>> Today we ingest a number of high cardinality metrics into Druid across
>>> dimensions. These metrics are rolled up on a per minute basis, and are
>>> very
>>> useful when looking at metrics on a partition or client basis. Events is
>>> another class of data that provides useful information about a particular
>>> incident/scenario inside a Kafka cluster. Events themselves are carried
>>> inside the kafka payload, but nonetheless there is some very useful
>>> metadata that is carried in kafka headers that can serve as a useful
>>> dimension for aggregation and in turn bringing better insights.
>>>
>>> PR(#10730 <https://github.com/apache/druid/pull/10730>) introduced
>>> support
>>> for Kafka headers in InputFormats.
>>>
>>> We still need an input format to parse out the headers and translate
>>> those
>>> into relevant columns in Druid. Until that’s implemented, none of the
>>> information available in the Kafka message headers would be exposed. So
>>> first there is a need to implement an input format that can parse headers
>>> in any given format(provided we support the format) like we parse
>>> payloads
>>> today. Apart from headers there is also some useful information present
>>> in
>>> the key portion of the kafka record. We also need a way to expose the
>>> data
>>> present in the key as druid columns. We need a generic way to express at
>>> configuration time what attributes from headers, key and payload need to
>>> be
>>> ingested into druid. We need to keep the design generic enough so that
>>> users can specify different parsers for headers, key and payload.
>>>
>>> Proposal is to design an input format to solve the above by providing
>>> wrapper around any existing input formats and merging the data into a
>>> single unified Druid row.
>>> Proposed changes
>>>
>>> Let's look at a sample input format from the above discussion
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *"inputFormat":{        "type": "kafka", // New input format type
>>> "headerLabelPrefix": "kafka.header.", // Label prefix for header columns,
>>> this will avoid collisions while merging columns
>>> "recordTimestampLabelPrefix": "kafka.", // Kafka record's timestamp is
>>> made
>>> available in case payload does not carry timestamp        "headerFormat":
>>> // Header parser specifying that values are of type string        {
>>>       "type": "string"        },       "valueFormat": // Value parser
>>> from
>>> json parsing       {             "type": "json",
>>>  "flattenSpec":
>>> {                     "useFieldDiscovery": true,
>>> "fields": [...]             }        },        "keyFormat": // Key parser
>>> also from json parsing         {             "type": "json"         }}*
>>>
>>> Since we have independent sections for header, key and payload, it will
>>> also enable parsing each section with its own parser, eg., headers coming
>>> in as string and payload as json.
>>>
>>> KafkaInputFormat(the new inputFormat class) will be the uber class
>>> extending inputFormat interface and will be responsible for creating
>>> individual parsers for header, key and payload, blend the data resolving
>>> conflicts in columns and generating a single unified InputRow for Druid
>>> ingestion.
>>>
>>> "headerFormat" will allow users to plug in a parser type for the header
>>> values and will add the default header prefix as "kafka.header."(can be
>>> overridden) for attributes to avoid collision while merging attributes
>>> with
>>> payload.
>>>
>>> Kafka payload parser will be responsible for parsing the Value portion of
>>> the Kafka record. This is where most of the data will come from and we
>>> should be able to plugin existing parsers. One thing to note here is that
>>> if batching is performed, then the code should be augmenting header and
>>> key
>>> values to every record in the batch.
>>>
>>> Kafka key parser will handle parsing the Key portion of the Kafka record
>>> and will ingest the Key with dimension name as "kafka.key".
>>> Operational impact, Test plan & Future work
>>>
>>> Since we had an immediate need to ingest blended data from header and
>>> payload, we have implemented the above proposal in a PR - here
>>> <https://github.com/apache/druid/pull/11630>
>>> -Lokesh Lingarajan
>>>
>>

Re: [Proposal] - Kafka Input Format for headers, key and payload parsing

Posted by Gian Merlino <gi...@apache.org>.
Lokesh, it looks like you got dropped from the thread, so I'm adding you
back. Please check out the previous message for some comments.

By the way, by default, replies to the dev list go back to the dev list
only, which can cause you to miss some replies. If you join the list you
will be sure to get all your replies đŸ™‚

On Tue, Sep 14, 2021 at 10:10 PM Gian Merlino <gi...@apache.org> wrote:

> Hey Lokesh,
>
> The concept and API looks solid to me! Thank you for writing this up. I
> agree with Ben's comment. This will be really useful functionality.
>
> I have a few questions about how it would work:
>
> 1) How is the timestamp exposed exactly? I see there is a
> recordTimestampLabelPrefix, but what is that a prefix to? Also: what do you
> think about accepting the entire name of the timestamp field instead?
> Finally: in the docs it would be good to have an example of how people can
> write a timestampSpec that refers to the Kafka timestamp, and also how they
> can load the Kafka timestamp as a long-typed dimension storing millis since
> the epoch (our convention for secondary timestamps).
>
> 2) You mention that the key will show up as "kafka.key", and in the
> example you provide I don't see a parameter enabling a choice of what that
> field is called. Is it hard-coded or is it configurable somehow?
>
> 3) Could you write up some user-facing docs too, like an addition to
> development/extensions-core/kafka-ingestion.md? That way, people will know
> how to use this feature. And it'll help us better understand how it's
> supposed to work. (Perhaps it could have answered the two questions above)
>
> Full disclosure: I haven't reviewed the patch yet; these questions are
> just based on your writeup.
>
> On Mon, Aug 30, 2021 at 3:00 PM Lokesh Lingarajan
> <ll...@confluent.io.invalid> wrote:
>
>> Motivation
>>
>> Today we ingest a number of high cardinality metrics into Druid across
>> dimensions. These metrics are rolled up on a per minute basis, and are
>> very
>> useful when looking at metrics on a partition or client basis. Events is
>> another class of data that provides useful information about a particular
>> incident/scenario inside a Kafka cluster. Events themselves are carried
>> inside the kafka payload, but nonetheless there is some very useful
>> metadata that is carried in kafka headers that can serve as a useful
>> dimension for aggregation and in turn bringing better insights.
>>
>> PR(#10730 <https://github.com/apache/druid/pull/10730>) introduced
>> support
>> for Kafka headers in InputFormats.
>>
>> We still need an input format to parse out the headers and translate those
>> into relevant columns in Druid. Until that’s implemented, none of the
>> information available in the Kafka message headers would be exposed. So
>> first there is a need to implement an input format that can parse headers
>> in any given format(provided we support the format) like we parse payloads
>> today. Apart from headers there is also some useful information present in
>> the key portion of the kafka record. We also need a way to expose the data
>> present in the key as druid columns. We need a generic way to express at
>> configuration time what attributes from headers, key and payload need to
>> be
>> ingested into druid. We need to keep the design generic enough so that
>> users can specify different parsers for headers, key and payload.
>>
>> Proposal is to design an input format to solve the above by providing
>> wrapper around any existing input formats and merging the data into a
>> single unified Druid row.
>> Proposed changes
>>
>> Let's look at a sample input format from the above discussion
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *"inputFormat":{        "type": "kafka", // New input format type
>> "headerLabelPrefix": "kafka.header.", // Label prefix for header columns,
>> this will avoid collisions while merging columns
>> "recordTimestampLabelPrefix": "kafka.", // Kafka record's timestamp is
>> made
>> available in case payload does not carry timestamp        "headerFormat":
>> // Header parser specifying that values are of type string        {
>>       "type": "string"        },       "valueFormat": // Value parser from
>> json parsing       {             "type": "json",
>>  "flattenSpec":
>> {                     "useFieldDiscovery": true,
>> "fields": [...]             }        },        "keyFormat": // Key parser
>> also from json parsing         {             "type": "json"         }}*
>>
>> Since we have independent sections for header, key and payload, it will
>> also enable parsing each section with its own parser, eg., headers coming
>> in as string and payload as json.
>>
>> KafkaInputFormat(the new inputFormat class) will be the uber class
>> extending inputFormat interface and will be responsible for creating
>> individual parsers for header, key and payload, blend the data resolving
>> conflicts in columns and generating a single unified InputRow for Druid
>> ingestion.
>>
>> "headerFormat" will allow users to plug in a parser type for the header
>> values and will add the default header prefix as "kafka.header."(can be
>> overridden) for attributes to avoid collision while merging attributes
>> with
>> payload.
>>
>> Kafka payload parser will be responsible for parsing the Value portion of
>> the Kafka record. This is where most of the data will come from and we
>> should be able to plugin existing parsers. One thing to note here is that
>> if batching is performed, then the code should be augmenting header and
>> key
>> values to every record in the batch.
>>
>> Kafka key parser will handle parsing the Key portion of the Kafka record
>> and will ingest the Key with dimension name as "kafka.key".
>> Operational impact, Test plan & Future work
>>
>> Since we had an immediate need to ingest blended data from header and
>> payload, we have implemented the above proposal in a PR - here
>> <https://github.com/apache/druid/pull/11630>
>> -Lokesh Lingarajan
>>
>

Re: [Proposal] - Kafka Input Format for headers, key and payload parsing

Posted by Gian Merlino <gi...@apache.org>.
Hey Lokesh,

The concept and API looks solid to me! Thank you for writing this up. I
agree with Ben's comment. This will be really useful functionality.

I have a few questions about how it would work:

1) How is the timestamp exposed exactly? I see there is a
recordTimestampLabelPrefix, but what is that a prefix to? Also: what do you
think about accepting the entire name of the timestamp field instead?
Finally: in the docs it would be good to have an example of how people can
write a timestampSpec that refers to the Kafka timestamp, and also how they
can load the Kafka timestamp as a long-typed dimension storing millis since
the epoch (our convention for secondary timestamps).

2) You mention that the key will show up as "kafka.key", and in the example
you provide I don't see a parameter enabling a choice of what that field is
called. Is it hard-coded or is it configurable somehow?

3) Could you write up some user-facing docs too, like an addition to
development/extensions-core/kafka-ingestion.md? That way, people will know
how to use this feature. And it'll help us better understand how it's
supposed to work. (Perhaps it could have answered the two questions above)

Full disclosure: I haven't reviewed the patch yet; these questions are just
based on your writeup.

On Mon, Aug 30, 2021 at 3:00 PM Lokesh Lingarajan
<ll...@confluent.io.invalid> wrote:

> Motivation
>
> Today we ingest a number of high cardinality metrics into Druid across
> dimensions. These metrics are rolled up on a per minute basis, and are very
> useful when looking at metrics on a partition or client basis. Events is
> another class of data that provides useful information about a particular
> incident/scenario inside a Kafka cluster. Events themselves are carried
> inside the kafka payload, but nonetheless there is some very useful
> metadata that is carried in kafka headers that can serve as a useful
> dimension for aggregation and in turn bringing better insights.
>
> PR(#10730 <https://github.com/apache/druid/pull/10730>) introduced support
> for Kafka headers in InputFormats.
>
> We still need an input format to parse out the headers and translate those
> into relevant columns in Druid. Until that’s implemented, none of the
> information available in the Kafka message headers would be exposed. So
> first there is a need to implement an input format that can parse headers
> in any given format(provided we support the format) like we parse payloads
> today. Apart from headers there is also some useful information present in
> the key portion of the kafka record. We also need a way to expose the data
> present in the key as druid columns. We need a generic way to express at
> configuration time what attributes from headers, key and payload need to be
> ingested into druid. We need to keep the design generic enough so that
> users can specify different parsers for headers, key and payload.
>
> Proposal is to design an input format to solve the above by providing
> wrapper around any existing input formats and merging the data into a
> single unified Druid row.
> Proposed changes
>
> Let's look at a sample input format from the above discussion
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *"inputFormat":{        "type": "kafka", // New input format type
> "headerLabelPrefix": "kafka.header.", // Label prefix for header columns,
> this will avoid collisions while merging columns
> "recordTimestampLabelPrefix": "kafka.", // Kafka record's timestamp is made
> available in case payload does not carry timestamp        "headerFormat":
> // Header parser specifying that values are of type string        {
>       "type": "string"        },       "valueFormat": // Value parser from
> json parsing       {             "type": "json",             "flattenSpec":
> {                     "useFieldDiscovery": true,
> "fields": [...]             }        },        "keyFormat": // Key parser
> also from json parsing         {             "type": "json"         }}*
>
> Since we have independent sections for header, key and payload, it will
> also enable parsing each section with its own parser, eg., headers coming
> in as string and payload as json.
>
> KafkaInputFormat(the new inputFormat class) will be the uber class
> extending inputFormat interface and will be responsible for creating
> individual parsers for header, key and payload, blend the data resolving
> conflicts in columns and generating a single unified InputRow for Druid
> ingestion.
>
> "headerFormat" will allow users to plug in a parser type for the header
> values and will add the default header prefix as "kafka.header."(can be
> overridden) for attributes to avoid collision while merging attributes with
> payload.
>
> Kafka payload parser will be responsible for parsing the Value portion of
> the Kafka record. This is where most of the data will come from and we
> should be able to plugin existing parsers. One thing to note here is that
> if batching is performed, then the code should be augmenting header and key
> values to every record in the batch.
>
> Kafka key parser will handle parsing the Key portion of the Kafka record
> and will ingest the Key with dimension name as "kafka.key".
> Operational impact, Test plan & Future work
>
> Since we had an immediate need to ingest blended data from header and
> payload, we have implemented the above proposal in a PR - here
> <https://github.com/apache/druid/pull/11630>
> -Lokesh Lingarajan
>

Re: [Proposal] - Kafka Input Format for headers, key and payload parsing

Posted by Lokesh Lingarajan <ll...@confluent.io.INVALID>.
Any updates/comments ?

-Lokesh


On Tue, Sep 7, 2021 at 9:29 AM Lokesh Lingarajan <ll...@confluent.io>
wrote:

> Hope everyone had a good long weekend. Any updates/comments ?
>
> -Lokesh
>
>
> On Mon, Aug 30, 2021 at 2:43 PM Lokesh Lingarajan <
> llingarajan@confluent.io> wrote:
>
>> Motivation
>>
>> Today we ingest a number of high cardinality metrics into Druid across
>> dimensions. These metrics are rolled up on a per minute basis, and are very
>> useful when looking at metrics on a partition or client basis. Events is
>> another class of data that provides useful information about a particular
>> incident/scenario inside a Kafka cluster. Events themselves are carried
>> inside the kafka payload, but nonetheless there is some very useful
>> metadata that is carried in kafka headers that can serve as a useful
>> dimension for aggregation and in turn bringing better insights.
>>
>> PR(#10730 <https://github.com/apache/druid/pull/10730>) introduced
>> support for Kafka headers in InputFormats.
>>
>> We still need an input format to parse out the headers and translate
>> those into relevant columns in Druid. Until that’s implemented, none of the
>> information available in the Kafka message headers would be exposed. So
>> first there is a need to implement an input format that can parse headers
>> in any given format(provided we support the format) like we parse payloads
>> today. Apart from headers there is also some useful information present in
>> the key portion of the kafka record. We also need a way to expose the data
>> present in the key as druid columns. We need a generic way to express at
>> configuration time what attributes from headers, key and payload need to be
>> ingested into druid. We need to keep the design generic enough so that
>> users can specify different parsers for headers, key and payload.
>>
>> Proposal is to design an input format to solve the above by providing
>> wrapper around any existing input formats and merging the data into a
>> single unified Druid row.
>> Proposed changes
>>
>> Let's look at a sample input format from the above discussion
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *"inputFormat":{        "type": "kafka", // New input format type
>> "headerLabelPrefix": "kafka.header.", // Label prefix for header columns,
>> this will avoid collisions while merging columns
>> "recordTimestampLabelPrefix": "kafka.", // Kafka record's timestamp is made
>> available in case payload does not carry timestamp        "headerFormat":
>> // Header parser specifying that values are of type string        {
>>       "type": "string"        },       "valueFormat": // Value parser from
>> json parsing       {             "type": "json",             "flattenSpec":
>> {                     "useFieldDiscovery": true,
>> "fields": [...]             }        },        "keyFormat": // Key parser
>> also from json parsing         {             "type": "json"         }}*
>>
>> Since we have independent sections for header, key and payload, it will
>> also enable parsing each section with its own parser, eg., headers coming
>> in as string and payload as json.
>>
>> KafkaInputFormat(the new inputFormat class) will be the uber class
>> extending inputFormat interface and will be responsible for creating
>> individual parsers for header, key and payload, blend the data resolving
>> conflicts in columns and generating a single unified InputRow for Druid
>> ingestion.
>>
>> "headerFormat" will allow users to plug in a parser type for the header
>> values and will add the default header prefix as "kafka.header."(can be
>> overridden) for attributes to avoid collision while merging attributes with
>> payload.
>>
>> Kafka payload parser will be responsible for parsing the Value portion of
>> the Kafka record. This is where most of the data will come from and we
>> should be able to plugin existing parsers. One thing to note here is that
>> if batching is performed, then the code should be augmenting header and key
>> values to every record in the batch.
>>
>> Kafka key parser will handle parsing the Key portion of the Kafka record
>> and will ingest the Key with dimension name as "kafka.key".
>> Operational impact, Test plan & Future work
>>
>> Since we had an immediate need to ingest blended data from header and
>> payload, we have implemented the above proposal in a PR - here
>> <https://github.com/apache/druid/pull/11630>
>> -Lokesh Lingarajan
>>
>

Re: [Proposal] - Kafka Input Format for headers, key and payload parsing

Posted by Ben Krug <be...@imply.io>.
I'm not a coder, but wanted to say that I have heard other druid users ask
for this functionality, so I think it would be useful.
Thank you!

On Tue, Sep 7, 2021 at 10:09 AM Lokesh Lingarajan
<ll...@confluent.io.invalid> wrote:

> Hope everyone had a good long weekend. Any updates/comments ?
>
> -Lokesh
>
>
> On Mon, Aug 30, 2021 at 2:43 PM Lokesh Lingarajan <
> llingarajan@confluent.io>
> wrote:
>
> > Motivation
> >
> > Today we ingest a number of high cardinality metrics into Druid across
> > dimensions. These metrics are rolled up on a per minute basis, and are
> very
> > useful when looking at metrics on a partition or client basis. Events is
> > another class of data that provides useful information about a particular
> > incident/scenario inside a Kafka cluster. Events themselves are carried
> > inside the kafka payload, but nonetheless there is some very useful
> > metadata that is carried in kafka headers that can serve as a useful
> > dimension for aggregation and in turn bringing better insights.
> >
> > PR(#10730 <https://github.com/apache/druid/pull/10730>) introduced
> > support for Kafka headers in InputFormats.
> >
> > We still need an input format to parse out the headers and translate
> those
> > into relevant columns in Druid. Until that’s implemented, none of the
> > information available in the Kafka message headers would be exposed. So
> > first there is a need to implement an input format that can parse headers
> > in any given format(provided we support the format) like we parse
> payloads
> > today. Apart from headers there is also some useful information present
> in
> > the key portion of the kafka record. We also need a way to expose the
> data
> > present in the key as druid columns. We need a generic way to express at
> > configuration time what attributes from headers, key and payload need to
> be
> > ingested into druid. We need to keep the design generic enough so that
> > users can specify different parsers for headers, key and payload.
> >
> > Proposal is to design an input format to solve the above by providing
> > wrapper around any existing input formats and merging the data into a
> > single unified Druid row.
> > Proposed changes
> >
> > Let's look at a sample input format from the above discussion
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > *"inputFormat":{        "type": "kafka", // New input format type
> > "headerLabelPrefix": "kafka.header.", // Label prefix for header columns,
> > this will avoid collisions while merging columns
> > "recordTimestampLabelPrefix": "kafka.", // Kafka record's timestamp is
> made
> > available in case payload does not carry timestamp        "headerFormat":
> > // Header parser specifying that values are of type string        {
> >       "type": "string"        },       "valueFormat": // Value parser
> from
> > json parsing       {             "type": "json",
>  "flattenSpec":
> > {                     "useFieldDiscovery": true,
> > "fields": [...]             }        },        "keyFormat": // Key parser
> > also from json parsing         {             "type": "json"         }}*
> >
> > Since we have independent sections for header, key and payload, it will
> > also enable parsing each section with its own parser, eg., headers coming
> > in as string and payload as json.
> >
> > KafkaInputFormat(the new inputFormat class) will be the uber class
> > extending inputFormat interface and will be responsible for creating
> > individual parsers for header, key and payload, blend the data resolving
> > conflicts in columns and generating a single unified InputRow for Druid
> > ingestion.
> >
> > "headerFormat" will allow users to plug in a parser type for the header
> > values and will add the default header prefix as "kafka.header."(can be
> > overridden) for attributes to avoid collision while merging attributes
> with
> > payload.
> >
> > Kafka payload parser will be responsible for parsing the Value portion of
> > the Kafka record. This is where most of the data will come from and we
> > should be able to plugin existing parsers. One thing to note here is that
> > if batching is performed, then the code should be augmenting header and
> key
> > values to every record in the batch.
> >
> > Kafka key parser will handle parsing the Key portion of the Kafka record
> > and will ingest the Key with dimension name as "kafka.key".
> > Operational impact, Test plan & Future work
> >
> > Since we had an immediate need to ingest blended data from header and
> > payload, we have implemented the above proposal in a PR - here
> > <https://github.com/apache/druid/pull/11630>
> > -Lokesh Lingarajan
> >
>

Re: [Proposal] - Kafka Input Format for headers, key and payload parsing

Posted by Lokesh Lingarajan <ll...@confluent.io.INVALID>.
Hope everyone had a good long weekend. Any updates/comments ?

-Lokesh


On Mon, Aug 30, 2021 at 2:43 PM Lokesh Lingarajan <ll...@confluent.io>
wrote:

> Motivation
>
> Today we ingest a number of high cardinality metrics into Druid across
> dimensions. These metrics are rolled up on a per minute basis, and are very
> useful when looking at metrics on a partition or client basis. Events is
> another class of data that provides useful information about a particular
> incident/scenario inside a Kafka cluster. Events themselves are carried
> inside the kafka payload, but nonetheless there is some very useful
> metadata that is carried in kafka headers that can serve as a useful
> dimension for aggregation and in turn bringing better insights.
>
> PR(#10730 <https://github.com/apache/druid/pull/10730>) introduced
> support for Kafka headers in InputFormats.
>
> We still need an input format to parse out the headers and translate those
> into relevant columns in Druid. Until that’s implemented, none of the
> information available in the Kafka message headers would be exposed. So
> first there is a need to implement an input format that can parse headers
> in any given format(provided we support the format) like we parse payloads
> today. Apart from headers there is also some useful information present in
> the key portion of the kafka record. We also need a way to expose the data
> present in the key as druid columns. We need a generic way to express at
> configuration time what attributes from headers, key and payload need to be
> ingested into druid. We need to keep the design generic enough so that
> users can specify different parsers for headers, key and payload.
>
> Proposal is to design an input format to solve the above by providing
> wrapper around any existing input formats and merging the data into a
> single unified Druid row.
> Proposed changes
>
> Let's look at a sample input format from the above discussion
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *"inputFormat":{        "type": "kafka", // New input format type
> "headerLabelPrefix": "kafka.header.", // Label prefix for header columns,
> this will avoid collisions while merging columns
> "recordTimestampLabelPrefix": "kafka.", // Kafka record's timestamp is made
> available in case payload does not carry timestamp        "headerFormat":
> // Header parser specifying that values are of type string        {
>       "type": "string"        },       "valueFormat": // Value parser from
> json parsing       {             "type": "json",             "flattenSpec":
> {                     "useFieldDiscovery": true,
> "fields": [...]             }        },        "keyFormat": // Key parser
> also from json parsing         {             "type": "json"         }}*
>
> Since we have independent sections for header, key and payload, it will
> also enable parsing each section with its own parser, eg., headers coming
> in as string and payload as json.
>
> KafkaInputFormat(the new inputFormat class) will be the uber class
> extending inputFormat interface and will be responsible for creating
> individual parsers for header, key and payload, blend the data resolving
> conflicts in columns and generating a single unified InputRow for Druid
> ingestion.
>
> "headerFormat" will allow users to plug in a parser type for the header
> values and will add the default header prefix as "kafka.header."(can be
> overridden) for attributes to avoid collision while merging attributes with
> payload.
>
> Kafka payload parser will be responsible for parsing the Value portion of
> the Kafka record. This is where most of the data will come from and we
> should be able to plugin existing parsers. One thing to note here is that
> if batching is performed, then the code should be augmenting header and key
> values to every record in the batch.
>
> Kafka key parser will handle parsing the Key portion of the Kafka record
> and will ingest the Key with dimension name as "kafka.key".
> Operational impact, Test plan & Future work
>
> Since we had an immediate need to ingest blended data from header and
> payload, we have implemented the above proposal in a PR - here
> <https://github.com/apache/druid/pull/11630>
> -Lokesh Lingarajan
>