You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Krzysztof Zarzycki <k....@gmail.com> on 2018/12/07 21:24:04 UTC

How to consume avro messages with schema reference from Kafka, into large flowfiles

Hi everyone,
I think I have quite a standard problem and maybe the answer would be
quick, but I can't find it on the internet.
We have avro messages in Kafka topic, written with HWX schema reference.
We're able to read them in with e.g. ConsumeKafkaRecord with Avro reader.

Now we would like to merge smaller flowfiles to larger files, because we
load these files to HDFS. What combination of processors should we use to
get this with the highest performance?
Option 1: ConsumeKafkaRecord with AvroReader and AvroRecordSetWriter, then
MergeRecord with AvroReader/AvroRecordSetWriter. It works, it seems
straight forward, but for me it looks like there is too many
interpretations and rewrites of records. Each records interpretation is an
unnecessary cost of deserialization and then serialization through java
heap.

Option 2: somehow configure ConsumeKafka and MergeContent to do this? We
used this combination for simple jsons (with binary concatenation), but we
can't get it right with avro messages with schema reference (PutParquet
processor can't read merged files with AvroReader). On the other side, this
should be the fastest as there is no data interpretation, just byte to byte
rewrite. Maybe we just haven't tried some of the configuration combination?

Maybe Other options?

Thank you for an advice.
Krzysztof

Re: How to consume avro messages with schema reference from Kafka, into large flowfiles

Posted by Pierre Villard <pi...@gmail.com>.
We did quite few improvements around all of this and I'd suggest trying
NiFi 1.8.
I've recently worked on some use cases where we processed millions of
events per second with no issue.

Back to the original description of your use case... as Bryan explained,
the fact that the schema reference is contained in the message is making
things harder. I don't know if it'd fit your use case, but with a recent
version of Kafka you could store the information about the schema in the
headers associated to the message. Then the message could be raw avro with
no schema. This way you could easily merge things. And it's up to you if
you want to use the record processors or not (depends of your requirements).

Pierre

Le mer. 12 déc. 2018 à 22:11, Viking K <cy...@hotmail.com> a écrit :

> As Bryan said knowing some more about your setup and expectations would
> help.
>
> But I can give some information from my own evaluation of a similar setup.
> I needed a new data pipeline into a data lake, the setup of the downstream
> systems were like this.
>
> Downstream components:
>
>    - Multiple Kafka Topics (8+)
>       - Topics have 4+ partitions.
>       - Every topic can hold multiple message types (different schemas)
>       - Only Avro bare records are stored in the topic, schema
>       available in registry.
>
> My requirement for going ahead with nifi:
>
>    - I want to consume messages at a rate of at least 30,000 messages per
>    second.
>    - I want no lost messages in transition during normal operations
>
>
> Results:
> The fastest way to consume messages after evaluation resulted in this data
> flow for ingress.
>
>    1. ConsumeKafka
>    2. ConvertRecord
>    3. Update Attribute
>    4. MergeContent
>    5. MergeContent
>
>
> Step - ConsumeKafka:
> Using ConsumeKafka is the fastest way to process records from a Kafka
> Topic.
> I also told the downstream systems to add Kafka Headers to the bare
> records so I could easily know where the message belong and timestamps.
> ConsumeKafkaRecord do a lot of serialization as Bryan mentioned affecting
> performance.
>
> Step - ConvertRecord:
> ConvertRecord uses the same serialization as ConsumeKafkaRecord but in
> this step you can just throw threads at the process to increase throughput.
> I allocate 8-16 threads to the ConvertRecord processors while I only got 1
> thread running on ConsumeKafka.
> Here I convert my bare Avro records to Avro messages (including schema)
>
> Step - Update Attribute:
> Running with ConsumeKafka and ConvertRecord generate ALOT of flowfiles and
> these need to be compacted in the end. Using UpdateAttribute to create a
> composite key based on Kafka headers (messagetype+date).
>
> Step - MergeContent:
> Merging all the incoming flowfiles based on composite-key
> (messagetype+date) using Avro merge strategy
> Here the flowfiles are compressed into bigger files with more records for
> easy management.
>
> Step - MergeContent:
> Recommendations is to have multiple merge steps instead of one big one.
>
>
> Performance Optimizations:
>
> https://community.hortonworks.com/articles/7882/hdfnifi-best-practices-for-setting-up-a-high-perfo.html
> The biggest bottleneck is the Provenance repository in this setup so
> running it as volatile is good if linage is not needed.
>
> End results:
>
> Throughput: 20-90k messages per second based on message complexity.
> Running on 2 servers with 4 nifi instances:
>
>    - SSD raid for flowfile repository
>    - 6 SAS drives for content repository
>    - 48 cores
>    - 200GB of RAM.
>
>
> ------------------------------
> *From:* Bryan Bende <bb...@gmail.com>
> *Sent:* Wednesday, December 12, 2018 2:33 PM
> *To:* users@nifi.apache.org
> *Subject:* Re: How to consume avro messages with schema reference from
> Kafka, into large flowfiles
>
> From your original email, option 1 is the correct approach. You are
> right that it is performing extra deserialization/serialization, but
> this is necessary to deal with the encoded schema references which
> really are modified Avro records.
>
> In option 2, if you take a whole bunch of records where the content is
> "schema ref + bare avro" and then you merge them together one after
> another, there is nothing that understands how to read this, it is not
> valid Avro, and there are no readers that expect multiple messages
> like this in a single flow file, that is why PutParquet can't read it.
>
> In order to understand the slow down we will need more info...
>
> - What version of Kafka broker are you using?
> - Are you using the corresponding version of the record processor?
> (i.e. if broker is 1.0.0 then should use ConsumeKafkaRecord_1_0_0)
> - How many partitions does your topic have?
> - How many nodes in your NiFi cluster?
> - How many concurrent tasks configured for ConsumeKafkaRecord?
> - What is the record batch size for ConsumeKafkaRecord?
>
> On Wed, Dec 12, 2018 at 5:07 AM Krzysztof Zarzycki <k....@gmail.com>
> wrote:
> >
> > Hello,
> > I just pull the thread up, if someone knows how to make the avro
> messages consumption faster, I would be grateful.
> > Some more info: When we switched from ConsumeKafka with jsons to
> ConsumeKafkaRecord with avro messages, we experienced a serious slowdown
> (mutliple X) . I can get more data what slowdown precisely, but my question
> about ConsumeKafka/MergeContent based flow becomes even more relevant to me.
> > Or maybe I'm doing something wrong, that ConsumeKafkaRecord is so slower?
> >
> > BTW, I'm on Nifi 1.7.1.
> >
> > Thank you,
> > Krzysztof Zarzycki
> >
> >
> > pt., 7 gru 2018 o 22:24 Krzysztof Zarzycki <k....@gmail.com>
> napisał(a):
> >>
> >> Hi everyone,
> >> I think I have quite a standard problem and maybe the answer would be
> quick, but I can't find it on the internet.
> >> We have avro messages in Kafka topic, written with HWX schema
> reference. We're able to read them in with e.g. ConsumeKafkaRecord with
> Avro reader.
> >>
> >> Now we would like to merge smaller flowfiles to larger files, because
> we load these files to HDFS. What combination of processors should we use
> to get this with the highest performance?
> >> Option 1: ConsumeKafkaRecord with AvroReader and AvroRecordSetWriter,
> then MergeRecord with AvroReader/AvroRecordSetWriter. It works, it seems
> straight forward, but for me it looks like there is too many
> interpretations and rewrites of records. Each records interpretation is an
> unnecessary cost of deserialization and then serialization through java
> heap.
> >>
> >> Option 2: somehow configure ConsumeKafka and MergeContent to do this?
> We used this combination for simple jsons (with binary concatenation), but
> we can't get it right with avro messages with schema reference (PutParquet
> processor can't read merged files with AvroReader). On the other side, this
> should be the fastest as there is no data interpretation, just byte to byte
> rewrite. Maybe we just haven't tried some of the configuration combination?
> >>
> >> Maybe Other options?
> >>
> >> Thank you for an advice.
> >> Krzysztof
>

Re: How to consume avro messages with schema reference from Kafka, into large flowfiles

Posted by Viking K <cy...@hotmail.com>.
As Bryan said knowing some more about your setup and expectations would help.

But I can give some information from my own evaluation of a similar setup.
I needed a new data pipeline into a data lake, the setup of the downstream systems were like this.

Downstream components:

  *   Multiple Kafka Topics (8+)
     *   Topics have 4+ partitions.
     *   Every topic can hold multiple message types (different schemas)
     *   Only Avro bare records are stored in the topic, schema available in registry.

My requirement for going ahead with nifi:

  *   I want to consume messages at a rate of at least 30,000 messages per second.
  *   I want no lost messages in transition during normal operations

Results:
The fastest way to consume messages after evaluation resulted in this data flow for ingress.

  1.  ConsumeKafka
  2.  ConvertRecord
  3.  Update Attribute
  4.  MergeContent
  5.  MergeContent

Step - ConsumeKafka:
Using ConsumeKafka is the fastest way to process records from a Kafka Topic.
I also told the downstream systems to add Kafka Headers to the bare records so I could easily know where the message belong and timestamps.
ConsumeKafkaRecord do a lot of serialization as Bryan mentioned affecting performance.

Step - ConvertRecord:
ConvertRecord uses the same serialization as ConsumeKafkaRecord but in this step you can just throw threads at the process to increase throughput.
I allocate 8-16 threads to the ConvertRecord processors while I only got 1 thread running on ConsumeKafka.
Here I convert my bare Avro records to Avro messages (including schema)

Step - Update Attribute:
Running with ConsumeKafka and ConvertRecord generate ALOT of flowfiles and these need to be compacted in the end. Using UpdateAttribute to create a composite key based on Kafka headers (messagetype+date).

Step - MergeContent:
Merging all the incoming flowfiles based on composite-key (messagetype+date) using Avro merge strategy
Here the flowfiles are compressed into bigger files with more records for easy management.

Step - MergeContent:
Recommendations is to have multiple merge steps instead of one big one.


Performance Optimizations:
https://community.hortonworks.com/articles/7882/hdfnifi-best-practices-for-setting-up-a-high-perfo.html
The biggest bottleneck is the Provenance repository in this setup so running it as volatile is good if linage is not needed.

End results:

Throughput: 20-90k messages per second based on message complexity.
Running on 2 servers with 4 nifi instances:

  *   SSD raid for flowfile repository
  *   6 SAS drives for content repository
  *   48 cores
  *   200GB of RAM.

________________________________
From: Bryan Bende <bb...@gmail.com>
Sent: Wednesday, December 12, 2018 2:33 PM
To: users@nifi.apache.org
Subject: Re: How to consume avro messages with schema reference from Kafka, into large flowfiles

From your original email, option 1 is the correct approach. You are
right that it is performing extra deserialization/serialization, but
this is necessary to deal with the encoded schema references which
really are modified Avro records.

In option 2, if you take a whole bunch of records where the content is
"schema ref + bare avro" and then you merge them together one after
another, there is nothing that understands how to read this, it is not
valid Avro, and there are no readers that expect multiple messages
like this in a single flow file, that is why PutParquet can't read it.

In order to understand the slow down we will need more info...

- What version of Kafka broker are you using?
- Are you using the corresponding version of the record processor?
(i.e. if broker is 1.0.0 then should use ConsumeKafkaRecord_1_0_0)
- How many partitions does your topic have?
- How many nodes in your NiFi cluster?
- How many concurrent tasks configured for ConsumeKafkaRecord?
- What is the record batch size for ConsumeKafkaRecord?

On Wed, Dec 12, 2018 at 5:07 AM Krzysztof Zarzycki <k....@gmail.com> wrote:
>
> Hello,
> I just pull the thread up, if someone knows how to make the avro messages consumption faster, I would be grateful.
> Some more info: When we switched from ConsumeKafka with jsons to ConsumeKafkaRecord with avro messages, we experienced a serious slowdown (mutliple X) . I can get more data what slowdown precisely, but my question about ConsumeKafka/MergeContent based flow becomes even more relevant to me.
> Or maybe I'm doing something wrong, that ConsumeKafkaRecord is so slower?
>
> BTW, I'm on Nifi 1.7.1.
>
> Thank you,
> Krzysztof Zarzycki
>
>
> pt., 7 gru 2018 o 22:24 Krzysztof Zarzycki <k....@gmail.com> napisał(a):
>>
>> Hi everyone,
>> I think I have quite a standard problem and maybe the answer would be quick, but I can't find it on the internet.
>> We have avro messages in Kafka topic, written with HWX schema reference. We're able to read them in with e.g. ConsumeKafkaRecord with Avro reader.
>>
>> Now we would like to merge smaller flowfiles to larger files, because we load these files to HDFS. What combination of processors should we use to get this with the highest performance?
>> Option 1: ConsumeKafkaRecord with AvroReader and AvroRecordSetWriter, then MergeRecord with AvroReader/AvroRecordSetWriter. It works, it seems straight forward, but for me it looks like there is too many interpretations and rewrites of records. Each records interpretation is an unnecessary cost of deserialization and then serialization through java heap.
>>
>> Option 2: somehow configure ConsumeKafka and MergeContent to do this? We used this combination for simple jsons (with binary concatenation), but we can't get it right with avro messages with schema reference (PutParquet processor can't read merged files with AvroReader). On the other side, this should be the fastest as there is no data interpretation, just byte to byte rewrite. Maybe we just haven't tried some of the configuration combination?
>>
>> Maybe Other options?
>>
>> Thank you for an advice.
>> Krzysztof

Re: How to consume avro messages with schema reference from Kafka, into large flowfiles

Posted by Bryan Bende <bb...@gmail.com>.
From your original email, option 1 is the correct approach. You are
right that it is performing extra deserialization/serialization, but
this is necessary to deal with the encoded schema references which
really are modified Avro records.

In option 2, if you take a whole bunch of records where the content is
"schema ref + bare avro" and then you merge them together one after
another, there is nothing that understands how to read this, it is not
valid Avro, and there are no readers that expect multiple messages
like this in a single flow file, that is why PutParquet can't read it.

In order to understand the slow down we will need more info...

- What version of Kafka broker are you using?
- Are you using the corresponding version of the record processor?
(i.e. if broker is 1.0.0 then should use ConsumeKafkaRecord_1_0_0)
- How many partitions does your topic have?
- How many nodes in your NiFi cluster?
- How many concurrent tasks configured for ConsumeKafkaRecord?
- What is the record batch size for ConsumeKafkaRecord?

On Wed, Dec 12, 2018 at 5:07 AM Krzysztof Zarzycki <k....@gmail.com> wrote:
>
> Hello,
> I just pull the thread up, if someone knows how to make the avro messages consumption faster, I would be grateful.
> Some more info: When we switched from ConsumeKafka with jsons to ConsumeKafkaRecord with avro messages, we experienced a serious slowdown (mutliple X) . I can get more data what slowdown precisely, but my question about ConsumeKafka/MergeContent based flow becomes even more relevant to me.
> Or maybe I'm doing something wrong, that ConsumeKafkaRecord is so slower?
>
> BTW, I'm on Nifi 1.7.1.
>
> Thank you,
> Krzysztof Zarzycki
>
>
> pt., 7 gru 2018 o 22:24 Krzysztof Zarzycki <k....@gmail.com> napisał(a):
>>
>> Hi everyone,
>> I think I have quite a standard problem and maybe the answer would be quick, but I can't find it on the internet.
>> We have avro messages in Kafka topic, written with HWX schema reference. We're able to read them in with e.g. ConsumeKafkaRecord with Avro reader.
>>
>> Now we would like to merge smaller flowfiles to larger files, because we load these files to HDFS. What combination of processors should we use to get this with the highest performance?
>> Option 1: ConsumeKafkaRecord with AvroReader and AvroRecordSetWriter, then MergeRecord with AvroReader/AvroRecordSetWriter. It works, it seems straight forward, but for me it looks like there is too many interpretations and rewrites of records. Each records interpretation is an unnecessary cost of deserialization and then serialization through java heap.
>>
>> Option 2: somehow configure ConsumeKafka and MergeContent to do this? We used this combination for simple jsons (with binary concatenation), but we can't get it right with avro messages with schema reference (PutParquet processor can't read merged files with AvroReader). On the other side, this should be the fastest as there is no data interpretation, just byte to byte rewrite. Maybe we just haven't tried some of the configuration combination?
>>
>> Maybe Other options?
>>
>> Thank you for an advice.
>> Krzysztof

Re: How to consume avro messages with schema reference from Kafka, into large flowfiles

Posted by Krzysztof Zarzycki <k....@gmail.com>.
Hello,
I just pull the thread up, if someone knows how to make the avro messages
consumption faster, I would be grateful.
Some more info: When we switched from ConsumeKafka with jsons to
ConsumeKafkaRecord with avro messages, we experienced a serious slowdown
(mutliple X) . I can get more data what slowdown precisely, but my question
about ConsumeKafka/MergeContent based flow becomes even more relevant to
me.
Or maybe I'm doing something wrong, that ConsumeKafkaRecord is so slower?

BTW, I'm on Nifi 1.7.1.

Thank you,
Krzysztof Zarzycki


pt., 7 gru 2018 o 22:24 Krzysztof Zarzycki <k....@gmail.com>
napisał(a):

> Hi everyone,
> I think I have quite a standard problem and maybe the answer would be
> quick, but I can't find it on the internet.
> We have avro messages in Kafka topic, written with HWX schema reference.
> We're able to read them in with e.g. ConsumeKafkaRecord with Avro reader.
>
> Now we would like to merge smaller flowfiles to larger files, because we
> load these files to HDFS. What combination of processors should we use to
> get this with the highest performance?
> Option 1: ConsumeKafkaRecord with AvroReader and AvroRecordSetWriter, then
> MergeRecord with AvroReader/AvroRecordSetWriter. It works, it seems
> straight forward, but for me it looks like there is too many
> interpretations and rewrites of records. Each records interpretation is an
> unnecessary cost of deserialization and then serialization through java
> heap.
>
> Option 2: somehow configure ConsumeKafka and MergeContent to do this? We
> used this combination for simple jsons (with binary concatenation), but we
> can't get it right with avro messages with schema reference (PutParquet
> processor can't read merged files with AvroReader). On the other side, this
> should be the fastest as there is no data interpretation, just byte to byte
> rewrite. Maybe we just haven't tried some of the configuration combination?
>
> Maybe Other options?
>
> Thank you for an advice.
> Krzysztof
>