You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by na...@bt.com on 2020/09/23 16:26:15 UTC

NiFi V1.9.2 Performance

Hi All,

We've got a NiFi 3 Node Cluster running on 3 x 40 CPU, 256GB RAM (32G Java Heap) servers. However, we have only been able to achieve a consumption of ~9.48GB Consumption Compressed (38.53GB Uncompressed) over 5 minutes, with a production rate of ~16.84GB out of the cluster over  5 mins. This is much lower than we were expecting based on what we have read. With this throughput we see a CPU load ~32 on all nodes, so we know there isn't much else we can get out of the CPU).

We have also tried SSDs, Raided and Unraided HDDs for the content repo storage, but they haven't made a difference to the amount we can process.

The process is as follows:

1.       Our flow reads from Kafka Compressed (Maximum of 2000 records per file). It then converts them from Avro to JSON. (ConsumeKafka_0_10 --> UpdateAttribute --> ConvertRecord)

2.       Depending on which topic the flow file is consumed from, we then send the message to one of 10 potential process groups, each containing between 3 and 5 processors within the process groups. (RouteOnAttribute --> Relevant Processing Group containing JoltTransformJSON and several custom processors we have made).

3.       Finally, we produce the flow file content back to one of several Kafka topics, based on the input topic name in Avro format with Snappy compression on the Kafka topic.

Inspecting the queued message counts, it indicates that the Jolt Transforms are taking the time to process (Large queues before JOLT processors, small or no queues afterwards). But I'm not sure why this is any worse than the rest of the processors as the event duration is less than a second when inspecting in provenance? We have tuned the number of concurrent tasks, duration and schedules to get the performance we have so far.

I'm not sure if there is anything anyone could recommend or suggest to try and make improvements? We need to achieve a rate around 5x of what it's currently processing with the same number of nodes. We are running out of ideas on how to accomplish this and may have to consider alternatives.

Kind Regards,

Nathan

Re: NiFi V1.9.2 Performance

Posted by Mark Payne <ma...@hotmail.com>.
Nathan,

For what it’s worth, I think that was just an oversight that the Kafka Topic name is not available when creating the Record Reader. I created a Jira [1] for it. Unfortunately, until that is updated, I think you’re right - your choices would likely be to use a separate processor for each topic, or to use the non-record-oriented approach. I don’t know enough about Avro off the top of my head, but you may be able to cram multiple messages into the same FlowFile using ConsumeKafka and using a Message Delimiter of empty-string, to directly concatenate the Avro payload? In that case, it should automatically group messages by their Kafka topic. But I feel like maybe that’s what you said you were doing to begin with?

Thanks
-Mark


[1] https://issues.apache.org/jira/browse/NIFI-7853

On Sep 25, 2020, at 7:10 AM, nathan.english@bt.com<ma...@bt.com> wrote:

Hi Matt,

We’ve now switched to the JoltTransformRecord processors. It does seem to be slightly better performing.

We are trying to switch out the ConsumeKafka_0_10 and ConvertRecord processors to the ConsumeKafkaRecord_0_10 processor based on feedback in this chain as well.

With the ConvertRecord processor, we used the kafka.topic attribute as it was available because the ConsumeKafka processor had completed on the flow file. The ConsumeKafkaRecord handles it all in one. When testing, it failed to parse as kafka.topic is not set.

As we are consuming from multiple topics each with a unique Avro schema the only way I can see to do this would be to have a separate processor and AvroReader controller service for each topic?

Kind Regards,

Nathan


From: Matt Burgess [mailto:mattyb149@apache.org]
Sent: 24 September 2020 17:08
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Nathan,

If you have multiple JSON messages in one flow file, is it in one large array, or a top-level JSON object with an array inside? Also are you trying to transform each message or the whole thing (i.e. do you need to know about more than one message at a time)? If you have a top-level array and are transforming each element in the array, you might get better performance out of JoltTransformRecord rather than JoltTransformJSON, as the latter reads the entire file into memory as a single JSON entity. If you have a top-level object then they will both read the whole thing in.

Regards,
Matt


On Thu, Sep 24, 2020 at 10:25 AM <na...@bt.com>> wrote:
Hi Mark,

From what I can see (based on queues building before the processor and basically to empty after) it is the Jolt processor we have problems with. We’ve have tried to add more concurrency, reduce the run schedule and increasing the duration, but it didn’t seem to resolve the high CPU load (~32 when processing at the rates described in my first email, when no traffic is processing it sits at 0.2).

It could be the completely wrong way of diagnosing this! I’ve struggled to find information (Apart from your great videos) to assist in getting to the bottom of it.

Kind Regards,

Nathan

From: Mark Payne [mailto:markap14@hotmail.com<ma...@hotmail.com>]
Sent: 24 September 2020 15:12
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Hey Nathan,

A quick clarification - is the bottleneck / the slow point in your flow actually consuming from Kafka or Jolt? From your original message it sounds like the bottleneck may actually be the Jolt Transform processor?

If the problem is in the ConsumeKafka processor, one thing you’ll want to look at is in the Settings tab, set the Yield Duration to “0 sec”. That can make a huge difference in performance from Kafka processors.

Thanks
-Mark


On Sep 24, 2020, at 10:07 AM, nathan.english@bt.com<ma...@bt.com> wrote:

Hi Bryan,

Thanks for this. My understanding of the concurrent tasks was incorrect. I thought it was across the whole cluster, not per node.

I did spend some time looking at the code for the demarcator as we had issues getting it batching. I think there may be a slight misunderstanding between my description and how it sounds.

When I say an Empty string, the message demarcator isn’t blank. I have used the checkbox ‘Set Empty String’, which means the processor treats the field as Null (From memory). If I left the field empty (checkbox not selected), it was one Kafka message to one flow file, which was a massive bottleneck.

I also seem to remember from when I looked at the code. The ConsumeKafkaRecord processors defaults the demarcator to null.

Kind Regards,

Nathan

From: Bryan Bende [mailto:bbende@gmail.com]
Sent: 24 September 2020 14:54
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Regarding the batching, I would have to double check the code, but since you said the demarcator is empty string, I think that means it is not batching and putting one message to one flow file. Basically if a demarcator is not set then batch size is ignored.

Regarding the processors/tasks, lets take one topic with 11 partitions as an example, if you make a consumer processor for this topic with 1 concurrent task, then you have 3 instances of this processor since you have a 3 node cluster, so you might end up with something like this...

node 1 - ConsumeKafka
  Task 1 - 4 partitions

node 2 - ConsumeKafka
  Task 1 - 4 partitions

node 3 - ConsumeKafka
  Task 1 - 3 partitions

It may not be exactly like that, but just an example as to how it should be assigned.

To add more parallelism you could then increase concurrent tasks up to maybe 4 and you get something like this...

node 1 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - 1 partition

node 2 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - 1 partition

node 3 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - nothing

If you go higher than 4 concurrent tasks you will just end up creating more consumers than partitions, and there is nothing to assign them.


On Thu, Sep 24, 2020 at 9:30 AM <na...@bt.com>> wrote:
Hi Bryan,

We have configured the processor to read in a maximum batch size of 2k messages, which does seem to have one than more Kafka message in the flow file.

Completely understand on the Load balancing, we tried several iterations of 1 task to one topic partition. However, we still found it to be loaded towards one specific node. I will try splitting it into multiple processors to see if this handles it any better. We have 10 topics with 11 partitions. (one topic with 2 partitions). So I should set all concurrent tasks to 1 with multiple processors (One processor per topic)?

Kind Regards,

Nathan
From: Bryan Bende [mailto:bbende@gmail.com<ma...@gmail.com>]
Sent: 24 September 2020 13:59
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

I'm not sure how many topics you have, but the biggest improvement would probably be to not do a single message per flow file. You want to batch together lots of messages coming out of ConsumeKafka using a demarcator, and then convert/transform them in batches. You may need to have a separate consumer processor for each topic in order to do this correctly.

You generally shouldn't need to use a load balanced connection after ConsumeKafka. If one node is being favored it is likely that the number of partitions in your topic is not lining up with the # of nifi nodes X # of concurrent consumer tasks. In the simplest case, if your topic had one partition, and you have 3 nifi nodes with Consumekafka with 1 concurrent task each, then all your messages will only get consumed on one of the nodes. If you have 3 partitions then it should be roughly equal. In your case you have 3 nodes with concurrent tasks set to 12, so you have potentially 36 consumers, which means if you have anything less than 36 partitions then it is not going to be balanced equally.

On Thu, Sep 24, 2020 at 3:54 AM <na...@bt.com>> wrote:
Hi Joe,

The Raids seem to give us a good IOPS number when we’ve tried testing them. We have seen a 300ms wait time on the Content Repo, hence why we have tried SSDs for the content repo as we assumed that was the bottleneck. The other Raids seemed OK to us.

I will certainly look into the ConsumeKafkaRecord processor today and will come up with a solution to use it. I will feedback on what I find.

In regards to our current flow configuration, we have the following high-level process groups:

•         ConsumeData -  Consumes the data from Kafka, Add some additional attributes, Converts to JSON from Avro

•         TransfromData – This is where the majority of the work happens, it gets routed on an attribute to a set sub-process group based on the type of data it is (this is decided based on the kafka.topic attribute) where the processing happens (Explained in more detail below)

•         ProduceData – Publishes the Record back to Kafka using the PublishKafkaRecord processor, add some attributes to calculate processing duration it took to process and logs this out.

•         Error Handling – A General Catch all. Luckily we don’t see this triggered very often, but it logs a message and writes the file to disk.

<image001.png>

ConsumeData process group:

•         ConsumeKafka, as mentioned in earlier emails, consumes the messages from multiple Kafka Topics with a Message Demarcator of an empty string and the Max poll Records set at 2,000.

•         The Consumed Flows queue uses RoundRobin load balancing as we have found one node is favoured for the consumption from Kafka so wanted to distribute the load.

•         AddAttributes uses the UpdateAttribute to add additional information to the flow file (Data Type, Output Kafka Topic) based on the Kafka.topic attribute on the flow file.

•         Finally, Convert Record converts the content of the flow file from Avro to JSON using the AvroRead and JsonRecordSetWriter Controller Services. In Additional we use the inbuilt AvroSchemaRegistry.

<image002.png>

TransformData process group. Due to the size, it’s not easy to show in a screenshot.

•         Flow files are routed based on attribute to 1 of 10 processing groups (based on the kafka.topic attribute)

•         Each Data Type has its own processing group which is I have gone into detail on one below

•         There are two failure output ports one for the initial routing issues, another for any errors that have happened in the sub-processing groups. There is also the Succesful output port.


<image003.png>


HTTP Data Type Sub-processing group (Inside TransformData):

•         The sub-processing groups all follow a somewhat similar layout but have different custom processors after the initial Jolt Transformation

•         The Jolt Transformation does the majority of the heavy lifting in converting the message into the right format to match the output Avro schema requirements. The Jolt Spec contains a chain of operations including modify-overwrite-beta, shift, and default

•         Each processor after the Jolt transformation does something different to meet a specific requirement not possible with other Nifi processors. Some like the IPAddressProcessor are reused in processing for other data types. We are planning to move into a single post-processing group where all data types are routed, before being published to Kafka.

<image004.png>

Produce Data, produces the messages back to different Kafka topics which have been defined in the output.topic (set in the consume data stage)

•         PublishKafkaRecord converts the message from JSON back to Avro (Different schema to input) using the JSONTreeReader and AvroRecordSetWriter Controller services. We have set it to Guarantee Single Node Delivery, and use Snappy Compression.

•         Set Exit Attributes adds a Final Processing time to the flow file

•         The Calculate duration uses this to work out its overall time processing

•         Then it’s finally logged to file so we can analyse the full processing time.

<image005.png>

Below is the scheduling settings we are using for processors:
Group
Processor
Concurrent
Tasks
Run
Duration
Yield
Duration
Other
Consume Data
ConsumeKafka
12
0
1 mS
Message Demarcator = "Empty string set"
Max Poll Records = 2,000
AddAttributes
10
0
1 S
ConvertRecord
10
0
1 S
TransformData
RouteOnAttribute
10
0
1 S
HTTP Jolt
5
0
200 mS
HTTP Post Processor
2
0
1 S
Produce Data
PublishKafka
10
0
1 S
Max Request Size = 1 MB
Compression Type = snappy
batch.size = 80,000 (Based on some custom producers we have written in the past)
linger.ms<https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Flinger.ms%2F&data=02%7C01%7Cnathan.english%40bt.com%7Ce1a257a5405a4210d35408d860a41ae1%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365605231282190&sdata=DwONev1IuaRISFcSVpRlIzWMLPumsFwBXWZe7V%2BCHVM%3D&reserved=0> = 50,000 (Based on some custom producers we have written in the past)
Set Exit Attributes
10
0
1 S
Calculate Duration
10
0
1 S
Log Success
10
0
1 S

The Queue configs are as follows:

Queue
Back Pressure
Object Threshold
Back Pressure
Size Threshold
Load Balance
After ConsumeKafka
20,000
1 GB
Round Robin
After Route On Attribute
All Other Queues In TransformData
10,000
1 GB
No
All Failure Routes
10,000
1 GB
No
Between ConsumerData And TransformData
Between TransformData And ProduceData
All Other Queues In ConsumeData
Before Route On Attribute
All Queues In ProduceData
20,000
1 GB
No

We also have a Maximum Timer Driven Thread Count of 200 set.

Stateless NiFi would fit this perfectly, but from what I have read, it’s not available in v1.9.2? We are stuck on 1.9.2 as we are using the Zookeeper provided as part of a Cloudera cluster. However, I’m wondering if we would be better suited to the volatile repositories, than writing to disk?

Thanks again for the advice so far Joe, you’ve given us some confidence it’s us doing something wrong and not something with NiFi.

Kind Regards,

Nathan

From: Joe Witt [mailto:joe.witt@gmail.com<ma...@gmail.com>]
Sent: 23 September 2020 19:05
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Nathan

Not sure what read/write rates you'll get in these RAID-10 configs but generally this seems like it should be fine (100s of MB/sec per node range at least).  Whereas now you're seeing about 20MB/sec/node.  This is definitely very low.

If you review http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.12.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/index.html<https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fcomponents%2Forg.apache.nifi%2Fnifi-kafka-2-6-nar%2F1.12.0%2Forg.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6%2Findex.html&data=02%7C01%7Cnathan.english%40bt.com%7Ce1a257a5405a4210d35408d860a41ae1%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365605231282190&sdata=M3rQp0EYnUrRvfElTawfttqKloYH3P6sHEZgVc4Fpgw%3D&reserved=0> then you'll see that we do actually capture attributes such as kafka.topic and so on.  Flowfiles would also be properly grouped by that.  What I'm not positive of is it could handle reading from multiple topics at the same time while also honoring and determining each of their distinct schemas.  Would need to test/verify that scenario to be sure.  If you do have a bunch of topics and they could grow/change then keeping this single processor approach makes sense.  If you can go the route of one ConsumeKafkaRecord per topic then obviously that would work well.

Not seeing your flow though I cannot be certain where the bottleneck(s) exist and provide guidance.  But this is without a doubt a vital skill to achieving maximum performance.

You'd have to show/share a ton more details for folks here to be helpful in walking through the full design.  Or explain the end to end flow.

As an additional food for thought if the flows are indeed 'from kafka -> do stuff -> back to kafka' this is likely a great use case for stateless-nifi.

Thanks

On Wed, Sep 23, 2020 at 10:43 AM <na...@bt.com>> wrote:
Hi Joe,

Thanks for getting back to me so quickly.

Our disk setup is as follows:
Path
Storage Type
Format
Capacity
Content
/
100GB OS SSD
ext4
89.9GB
OS, NiFi install, Logs
/data/1/
2 x 4TB SAS Hard Drives in RAID 1
ext4
3.7TB
Database and Flowfile Repos
/data/2/
8 x 4TB SAS Hard Drives in RAID 10
ext4
14.6TB
Content Repo
/data/3/
2 x 4TB SAS Hard Drives in RAID 1
ext4
3.7TB
Provence Repo
/ssd
1 x 4TB PCIe NVMe SSD
ext4
3.7TB
Content Repo (Used instead of /data/2/ as a test), to see if CPU was bottlenecked by Disk operations.

I will certainly take a look at those. One question with the consume record processor is how I would consume from multiple topics and ensure the correct Avro schema is used to deserialise the message? We have 1:1 mapping of schemas to topics. At the moment the ConsumeKafka processor is reading from all topics in one consumer. I’m assuming the attribute kafka.topic attribute doesn’t exist at this stage? We use the Avro Schema Registry Controller as we don’t have a schema registry in place yet.

Kind Regards,

Nathan

From: Joe Witt [mailto:joe.witt@gmail.com<ma...@gmail.com>]
Sent: 23 September 2020 17:33
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Nathan

You have plenty powerful machines to hit super high speeds but what I cannot tell is how the disks are setup/capability and layout wise and relative to our three repos of importance.  You'll need to share those details.

That said, the design of the flow matters.  The Kafka processors that aren't record oriented will perform poorly unless they're acquiring data in their natural batches as they arrive from kafka.  In short, use the record oriented processors from Kafka.  In it you can even deal with the fact you want to go from AVRO to Json and so on.  These processors have a tougher learning curve but they perform extremely well and we have powerful processors to go along with them for common patterns.

You absolutely should be able to get to the big numbers you have seen.  It requires great flow design (powerful machines are secondary).

Thanks

On Wed, Sep 23, 2020 at 9:26 AM <na...@bt.com>> wrote:
Hi All,

We’ve got a NiFi 3 Node Cluster running on 3 x 40 CPU, 256GB RAM (32G Java Heap) servers. However, we have only been able to achieve a consumption of ~9.48GB Consumption Compressed (38.53GB Uncompressed) over 5 minutes, with a production rate of ~16.84GB out of the cluster over  5 mins. This is much lower than we were expecting based on what we have read. With this throughput we see a CPU load ~32 on all nodes, so we know there isn’t much else we can get out of the CPU).

We have also tried SSDs, Raided and Unraided HDDs for the content repo storage, but they haven’t made a difference to the amount we can process.

The process is as follows:

1.       Our flow reads from Kafka Compressed (Maximum of 2000 records per file). It then converts them from Avro to JSON. (ConsumeKafka_0_10 --> UpdateAttribute --> ConvertRecord)

2.       Depending on which topic the flow file is consumed from, we then send the message to one of 10 potential process groups, each containing between 3 and 5 processors within the process groups. (RouteOnAttribute --> Relevant Processing Group containing JoltTransformJSON and several custom processors we have made).

3.       Finally, we produce the flow file content back to one of several Kafka topics, based on the input topic name in Avro format with Snappy compression on the Kafka topic.


Inspecting the queued message counts, it indicates that the Jolt Transforms are taking the time to process (Large queues before JOLT processors, small or no queues afterwards). But I’m not sure why this is any worse than the rest of the processors as the event duration is less than a second when inspecting in provenance? We have tuned the number of concurrent tasks, duration and schedules to get the performance we have so far.

I’m not sure if there is anything anyone could recommend or suggest to try and make improvements? We need to achieve a rate around 5x of what it’s currently processing with the same number of nodes. We are running out of ideas on how to accomplish this and may have to consider alternatives.

Kind Regards,

Nathan


Re: NiFi V1.9.2 Performance

Posted by Matt Burgess <ma...@apache.org>.
Nathan,

I recommend avoiding the "default" spec for performance purposes, the JOLT
maintainer(s) themselves say "The "default" transform is dated and clunky,
as it has not been refactored and curated like Shiftr has.". You may be
able to put the default values as "literals" in the shift spec, something
like a key "#${kafka.partition}" with the value of whatever field name you
want. It's a little backwards but seems to work. Alternatively try
UpdateRecord instead of JoltTransformJSON. I think it will infer the new
schema (with the additional field), but I haven't tried it.

Regards,
Matt


On Fri, Oct 2, 2020 at 3:46 AM <na...@bt.com> wrote:

> Hi All,
>
>
>
> We’ve continued with testing during the week and went back to a cluster
> without SSDs, and reset back to where we were before sending this email out
> (No Record based Processors).
>
>
>
> Interestingly we think we may have found our issue with JOLT. We haven’t
> thoroughly tested this yet, but with one of the transformations out of the
> ten, we removed the default operation from the Jolt Spec in the
> JoltTransformJSON processor. Based on a post we found on the Cloudera
> forums [1]. Any backlog in the queues we once saw on the Jolt transform has
> gone. It’s gone from what was roughly half a second to  131ms, and this
> surprised us. Am I correct in assuming this is a JOLT issue rather than a
> NiFi processor issue?
>
>
>
> Unfortunately, we’ve got to put NiFi testing off to one side for the next
> few weeks. However, during this time we should be making the jump to
> v1.11.4 of NiFi as part of Cloudera installs.
>
>
>
> Once we have tested this further, I will share any results or steps taken
> to improve the throughput.
>
>
>
> Thanks for all the assistance!
>
>
>
> Nathan
>
>
>
> [1] -
> https://community.cloudera.com/t5/Support-Questions/JoltTransformJSON-performance-issue-when-using-default/td-p/226827
>
>
>
> *From:* English,N,Nathan,VIR R
> *Sent:* 28 September 2020 14:12
> *To:* users@nifi.apache.org
> *Subject:* RE: NiFi V1.9.2 Performance
>
>
>
> Hi Eric,
>
>
>
> Thanks for your suggestion. I have tried the volatile provenance repo
> today, and it did seem to make the cluster very unstable. I also had to
> increase the heap to 128G to make it even remotely testable. CPU load was
> above 40 on all nodes at the same volume of traffic as discussed beforehand.
>
>
>
> Before that, I had switched out all Kafka Consumers to the Kafka Record
> Consumer processors (1 per topic). I have also replaced all the Jolt
> Transform processors with the Jolt Transform Record producers. Performance
> and CPU load remained relatively consistent (~32 CPU load) compared to the
> previous testing when using the standard processors (without Volatile Prov
> Repo). The bottlenecks still seem to be on Jolt, but I’m reluctant to up
> any of the concurrent tasks any higher as we are so high on CPU load.
>
>
>
> I’m going to spend the afternoon comparing the nifi.properties files we
> have on the nodes to the default to see if there is anything obvious that
> would cause the high CPU.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Eric O'Reilly [mailto:eric.oreilly@onyxpoint.com
> <er...@onyxpoint.com>]
> *Sent:* 28 September 2020 04:27
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Nathan,
>
> Hopefully, this finds your issues resolved. But if not, I do have another
> direction to investigate. In my experience, it's not enough to just move
> your content repo to SSD. Generally, I only see a gain if both the
> provenance repo and the content repo are on the same speed disk. But I
> might be biased as my flows take in high volumes and have a high number of
> provenance inserts to process. You can test this (if your comfortable not
> recording your provenance for a while), by switching the
> implementation method of your provenance repository to volatile (see here
> for more details -
> https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#provenance-repository
> <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fhtml%2Fadministration-guide.html%23provenance-repository&data=02%7C01%7Cnathan.english%40bt.com%7C62076bae7dc84583966508d8635e6ce0%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637368605539054991&sdata=cS8GMKQLK5tBWj32bVoPWiLFE0VTQiqQnnqUB0u7Fxw%3D&reserved=0>).
> It basically just keeps a set number of records in memory (see here on how
> to configure -
> https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#volatile-provenance-repository-properties
> <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fhtml%2Fadministration-guide.html%23volatile-provenance-repository-properties&data=02%7C01%7Cnathan.english%40bt.com%7C62076bae7dc84583966508d8635e6ce0%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637368605539064976&sdata=FRYBooTQxHLyxsTLLTzbED5KR%2BaLFb0gOM%2BvkJaKunE%3D&reserved=0>),
> which removes any potential disk bottlenecks for the prov repo. I wouldn't
> want to run a production cluster like this and keep in mind it will impact
> your heap space (especially if you make the number of buffered records
> high), but it would be a starting point to determine if you're bottleneck
> is related to disk IO for the provenance repository storage.
>
> v/r,
>
> Eric O'Reilly
>
>
>
> On Fri, Sep 25, 2020 at 7:10 AM <na...@bt.com> wrote:
>
> Hi Matt,
>
>
>
> We’ve now switched to the JoltTransformRecord processors. It does seem to
> be slightly better performing.
>
>
>
> We are trying to switch out the ConsumeKafka_0_10 and ConvertRecord
> processors to the ConsumeKafkaRecord_0_10 processor based on feedback in
> this chain as well.
>
>
>
> With the ConvertRecord processor, we used the kafka.topic attribute as it
> was available because the ConsumeKafka processor had completed on the flow
> file. The ConsumeKafkaRecord handles it all in one. When testing, it failed
> to parse as kafka.topic is not set.
>
>
>
> As we are consuming from multiple topics each with a unique Avro schema
> the only way I can see to do this would be to have a separate processor and
> AvroReader controller service for each topic?
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
>
>
> *From:* Matt Burgess [mailto:mattyb149@apache.org]
> *Sent:* 24 September 2020 17:08
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Nathan,
>
>
>
> If you have multiple JSON messages in one flow file, is it in one large
> array, or a top-level JSON object with an array inside? Also are you trying
> to transform each message or the whole thing (i.e. do you need to know
> about more than one message at a time)? If you have a top-level array and
> are transforming each element in the array, you might get better
> performance out of JoltTransformRecord rather than JoltTransformJSON, as
> the latter reads the entire file into memory as a single JSON entity. If
> you have a top-level object then they will both read the whole thing in.
>
>
>
> Regards,
>
> Matt
>
>
>
>
>
> On Thu, Sep 24, 2020 at 10:25 AM <na...@bt.com> wrote:
>
> Hi Mark,
>
>
>
> From what I can see (based on queues building before the processor and
> basically to empty after) it is the Jolt processor we have problems with.
> We’ve have tried to add more concurrency, reduce the run schedule and
> increasing the duration, but it didn’t seem to resolve the high CPU load
> (~32 when processing at the rates described in my first email, when no
> traffic is processing it sits at 0.2).
>
>
>
> It could be the completely wrong way of diagnosing this! I’ve struggled to
> find information (Apart from your great videos) to assist in getting to the
> bottom of it.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Mark Payne [mailto:markap14@hotmail.com]
> *Sent:* 24 September 2020 15:12
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Hey Nathan,
>
>
>
> A quick clarification - is the bottleneck / the slow point in your flow
> actually consuming from Kafka or Jolt? From your original message it sounds
> like the bottleneck may actually be the Jolt Transform processor?
>
>
>
> If the problem is in the ConsumeKafka processor, one thing you’ll want to
> look at is in the Settings tab, set the Yield Duration to “0 sec”. That can
> make a huge difference in performance from Kafka processors.
>
>
>
> Thanks
>
> -Mark
>
>
>
>
>
> On Sep 24, 2020, at 10:07 AM, nathan.english@bt.com wrote:
>
>
>
> Hi Bryan,
>
>
>
> Thanks for this. My understanding of the concurrent tasks was incorrect. I
> thought it was across the whole cluster, not per node.
>
>
>
> I did spend some time looking at the code for the demarcator as we had
> issues getting it batching. I think there may be a slight misunderstanding
> between my description and how it sounds.
>
>
>
> When I say an Empty string, the message demarcator isn’t blank. I have
> used the checkbox ‘Set Empty String’, which means the processor treats the
> field as Null (From memory). If I left the field empty (checkbox not
> selected), it was one Kafka message to one flow file, which was a massive
> bottleneck.
>
>
>
> I also seem to remember from when I looked at the code. The
> ConsumeKafkaRecord processors defaults the demarcator to null.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Bryan Bende [mailto:bbende@gmail.com <bb...@gmail.com>]
> *Sent:* 24 September 2020 14:54
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Regarding the batching, I would have to double check the code, but since
> you said the demarcator is empty string, I think that means it is not
> batching and putting one message to one flow file. Basically if a
> demarcator is not set then batch size is ignored.
>
>
>
> Regarding the processors/tasks, lets take one topic with 11 partitions as
> an example, if you make a consumer processor for this topic with 1
> concurrent task, then you have 3 instances of this processor since you have
> a 3 node cluster, so you might end up with something like this...
>
>
>
> node 1 - ConsumeKafka
>
>   Task 1 - 4 partitions
>
>
>
> node 2 - ConsumeKafka
>
>   Task 1 - 4 partitions
>
>
>
> node 3 - ConsumeKafka
>
>   Task 1 - 3 partitions
>
>
>
> It may not be exactly like that, but just an example as to how it should
> be assigned.
>
>
>
> To add more parallelism you could then increase concurrent tasks up to
> maybe 4 and you get something like this...
>
>
>
> node 1 - ConsumeKafka
>
>   Task 1 - 1 partition
>
>   Task 2 - 1 partition
>
>   Task 3 - 1 partition
>
>   Task 4 - 1 partition
>
>
>
> node 2 - ConsumeKafka
>
>   Task 1 - 1 partition
>
>   Task 2 - 1 partition
>
>   Task 3 - 1 partition
>
>   Task 4 - 1 partition
>
>
>
> node 3 - ConsumeKafka
>
>   Task 1 - 1 partition
>
>   Task 2 - 1 partition
>
>   Task 3 - 1 partition
>
>   Task 4 - nothing
>
>
>
> If you go higher than 4 concurrent tasks you will just end up creating
> more consumers than partitions, and there is nothing to assign them.
>
>
>
>
>
> On Thu, Sep 24, 2020 at 9:30 AM <na...@bt.com> wrote:
>
> Hi Bryan,
>
>
>
> We have configured the processor to read in a maximum batch size of 2k
> messages, which does seem to have one than more Kafka message in the flow
> file.
>
>
>
> Completely understand on the Load balancing, we tried several iterations
> of 1 task to one topic partition. However, we still found it to be loaded
> towards one specific node. I will try splitting it into multiple processors
> to see if this handles it any better. We have 10 topics with 11 partitions.
> (one topic with 2 partitions). So I should set all concurrent tasks to 1
> with multiple processors (One processor per topic)?
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
> *From:* Bryan Bende [mailto:bbende@gmail.com]
> *Sent:* 24 September 2020 13:59
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> I'm not sure how many topics you have, but the biggest improvement would
> probably be to not do a single message per flow file. You want to batch
> together lots of messages coming out of ConsumeKafka using a demarcator,
> and then convert/transform them in batches. You may need to have a separate
> consumer processor for each topic in order to do this correctly.
>
>
>
> You generally shouldn't need to use a load balanced connection after
> ConsumeKafka. If one node is being favored it is likely that the number of
> partitions in your topic is not lining up with the # of nifi nodes X # of
> concurrent consumer tasks. In the simplest case, if your topic had one
> partition, and you have 3 nifi nodes with Consumekafka with 1 concurrent
> task each, then all your messages will only get consumed on one of the
> nodes. If you have 3 partitions then it should be roughly equal. In your
> case you have 3 nodes with concurrent tasks set to 12, so you have
> potentially 36 consumers, which means if you have anything less than 36
> partitions then it is not going to be balanced equally.
>
>
>
> On Thu, Sep 24, 2020 at 3:54 AM <na...@bt.com> wrote:
>
> Hi Joe,
>
>
>
> The Raids seem to give us a good IOPS number when we’ve tried testing
> them. We have seen a 300ms wait time on the Content Repo, hence why we have
> tried SSDs for the content repo as we assumed that was the bottleneck. The
> other Raids seemed OK to us.
>
>
>
> I will certainly look into the ConsumeKafkaRecord processor today and will
> come up with a solution to use it. I will feedback on what I find.
>
>
>
> In regards to our current flow configuration, we have the following
> high-level process groups:
>
> ·         ConsumeData -  Consumes the data from Kafka, Add some
> additional attributes, Converts to JSON from Avro
>
> ·         TransfromData – This is where the majority of the work happens,
> it gets routed on an attribute to a set sub-process group based on the type
> of data it is (this is decided based on the kafka.topic attribute) where
> the processing happens (Explained in more detail below)
>
> ·         ProduceData – Publishes the Record back to Kafka using the
> PublishKafkaRecord processor, add some attributes to calculate processing
> duration it took to process and logs this out.
>
> ·         Error Handling – A General Catch all. Luckily we don’t see this
> triggered very often, but it logs a message and writes the file to disk.
>
> <image001.png>
>
>
>
> *ConsumeData* process group:
>
> ·         ConsumeKafka, as mentioned in earlier emails, consumes the
> messages from multiple Kafka Topics with a Message Demarcator of an empty
> string and the Max poll Records set at 2,000.
>
> ·         The Consumed Flows queue uses RoundRobin load balancing as we
> have found one node is favoured for the consumption from Kafka so wanted to
> distribute the load.
>
> ·         AddAttributes uses the UpdateAttribute to add additional
> information to the flow file (Data Type, Output Kafka Topic) based on the
> Kafka.topic attribute on the flow file.
>
> ·         Finally, Convert Record converts the content of the flow file
> from Avro to JSON using the AvroRead and JsonRecordSetWriter Controller
> Services. In Additional we use the inbuilt AvroSchemaRegistry.
>
> <image002.png>
>
>
>
> *TransformData* process group. Due to the size, it’s not easy to show in
> a screenshot.
>
> ·         Flow files are routed based on attribute to 1 of 10 processing
> groups (based on the kafka.topic attribute)
>
> ·         Each Data Type has its own processing group which is I have
> gone into detail on one below
>
> ·         There are two failure output ports one for the initial routing
> issues, another for any errors that have happened in the sub-processing
> groups. There is also the Succesful output port.
>
>
>
> <image003.png>
>
>
>
>
>
> *HTTP Data Type Sub-processing group (Inside TransformData):*
>
> ·         The sub-processing groups all follow a somewhat similar layout
> but have different custom processors after the initial Jolt Transformation
>
> ·         The Jolt Transformation does the majority of the heavy lifting
> in converting the message into the right format to match the output Avro
> schema requirements. The Jolt Spec contains a chain of operations including
> modify-overwrite-beta, shift, and default
>
> ·         Each processor after the Jolt transformation does something
> different to meet a specific requirement not possible with other Nifi
> processors. Some like the IPAddressProcessor are reused in processing for
> other data types. We are planning to move into a single post-processing
> group where all data types are routed, before being published to Kafka.
>
> <image004.png>
>
>
>
> *Produce Data*, produces the messages back to different Kafka topics
> which have been defined in the output.topic (set in the consume data stage)
>
> ·         PublishKafkaRecord converts the message from JSON back to Avro
> (Different schema to input) using the JSONTreeReader and
> AvroRecordSetWriter Controller services. We have set it to Guarantee Single
> Node Delivery, and use Snappy Compression.
>
> ·         Set Exit Attributes adds a Final Processing time to the flow
> file
>
> ·         The Calculate duration uses this to work out its overall time
> processing
>
> ·         Then it’s finally logged to file so we can analyse the full
> processing time.
>
> <image005.png>
>
>
>
> Below is the scheduling settings we are using for processors:
>
> *Group*
>
> *Processor*
>
> *Concurrent*
>
> *Tasks*
>
> *Run*
>
> *Duration*
>
> *Yield*
>
> *Duration*
>
> *Other*
>
> Consume Data
>
> ConsumeKafka
>
> 12
>
> 0
>
> 1 mS
>
> Message Demarcator = "Empty string set"
>
> Max Poll Records = 2,000
>
> AddAttributes
>
> 10
>
> 0
>
> 1 S
>
> ConvertRecord
>
> 10
>
> 0
>
> 1 S
>
> TransformData
>
> RouteOnAttribute
>
> 10
>
> 0
>
> 1 S
>
> HTTP Jolt
>
> 5
>
> 0
>
> 200 mS
>
> HTTP Post Processor
>
> 2
>
> 0
>
> 1 S
>
> Produce Data
>
> PublishKafka
>
> 10
>
> 0
>
> 1 S
>
> Max Request Size = 1 MB
>
> Compression Type = snappy
>
> batch.size = 80,000 (Based on some custom producers we have written in the
> past)
>
> linger.ms
> <https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Flinger.ms%2F&data=02%7C01%7Cnathan.english%40bt.com%7C62076bae7dc84583966508d8635e6ce0%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637368605539064976&sdata=JYPnHBdViRsRmfOWp5gKoOsOwZNB2yeFVMuRZVFerV4%3D&reserved=0> =
> 50,000 (Based on some custom producers we have written in the past)
>
> Set Exit Attributes
>
> 10
>
> 0
>
> 1 S
>
> Calculate Duration
>
> 10
>
> 0
>
> 1 S
>
> Log Success
>
> 10
>
> 0
>
> 1 S
>
>
>
> The Queue configs are as follows:
>
>
>
> *Queue*
>
>
> *Back Pressure Object Threshold*
>
>
> *Back Pressure Size Threshold*
>
> *Load Balance*
>
> After ConsumeKafka
>
> 20,000
>
> 1 GB
>
> Round Robin
>
> After Route On Attribute
>
> All Other Queues In TransformData
>
> 10,000
>
> 1 GB
>
> No
>
> All Failure Routes
>
> 10,000
>
> 1 GB
>
> No
>
> Between ConsumerData And TransformData
>
> Between TransformData And ProduceData
>
> All Other Queues In ConsumeData
>
> Before Route On Attribute
>
> All Queues In ProduceData
>
> 20,000
>
> 1 GB
>
> No
>
>
>
> We also have a Maximum Timer Driven Thread Count of 200 set.
>
>
>
> Stateless NiFi would fit this perfectly, but from what I have read, it’s
> not available in v1.9.2? We are stuck on 1.9.2 as we are using the
> Zookeeper provided as part of a Cloudera cluster. However, I’m wondering if
> we would be better suited to the volatile repositories, than writing to
> disk?
>
>
>
> Thanks again for the advice so far Joe, you’ve given us some confidence
> it’s us doing something wrong and not something with NiFi.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Joe Witt [mailto:joe.witt@gmail.com]
> *Sent:* 23 September 2020 19:05
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Nathan
>
>
>
> Not sure what read/write rates you'll get in these RAID-10 configs but
> generally this seems like it should be fine (100s of MB/sec per node range
> at least).  Whereas now you're seeing about 20MB/sec/node.  This is
> definitely very low.
>
>
>
> If you review
> http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.12.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/index.html
> <https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fcomponents%2Forg.apache.nifi%2Fnifi-kafka-2-6-nar%2F1.12.0%2Forg.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6%2Findex.html&data=02%7C01%7Cnathan.english%40bt.com%7C62076bae7dc84583966508d8635e6ce0%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637368605539074970&sdata=BJJ7xavZXS3QN4yKQ1OMqxasvtWxOaphK4cFE2vJ9S4%3D&reserved=0>
>  then you'll see that we do actually capture attributes such as
> kafka.topic and so on.  Flowfiles would also be properly grouped by that.
> What I'm not positive of is it could handle reading from multiple topics at
> the same time while also honoring and determining each of their distinct
> schemas.  Would need to test/verify that scenario to be sure.  If you do
> have a bunch of topics and they could grow/change then keeping this single
> processor approach makes sense.  If you can go the route of one
> ConsumeKafkaRecord per topic then obviously that would work well.
>
>
>
> Not seeing your flow though I cannot be certain where the bottleneck(s)
> exist and provide guidance.  But this is without a doubt a vital skill to
> achieving maximum performance.
>
>
>
> You'd have to show/share a ton more details for folks here to be helpful
> in walking through the full design.  Or explain the end to end flow.
>
>
>
> As an additional food for thought if the flows are indeed 'from kafka ->
> do stuff -> back to kafka' this is likely a great use case for
> stateless-nifi.
>
>
>
> Thanks
>
>
>
> On Wed, Sep 23, 2020 at 10:43 AM <na...@bt.com> wrote:
>
> Hi Joe,
>
>
>
> Thanks for getting back to me so quickly.
>
>
>
> Our disk setup is as follows:
>
> Path
>
> Storage Type
>
> Format
>
> Capacity
>
> Content
>
> /
>
> 100GB OS SSD
>
> ext4
>
> 89.9GB
>
> OS, NiFi install, Logs
>
> /data/1/
>
> 2 x 4TB SAS Hard Drives in RAID 1
>
> ext4
>
> 3.7TB
>
> Database and Flowfile Repos
>
> /data/2/
>
> 8 x 4TB SAS Hard Drives in RAID 10
>
> ext4
>
> 14.6TB
>
> Content Repo
>
> /data/3/
>
> 2 x 4TB SAS Hard Drives in RAID 1
>
> ext4
>
> 3.7TB
>
> Provence Repo
>
> /ssd
>
> 1 x 4TB PCIe NVMe SSD
>
> ext4
>
> 3.7TB
>
> Content Repo (Used instead of /data/2/ as a test), to see if CPU was
> bottlenecked by Disk operations.
>
>
>
> I will certainly take a look at those. One question with the consume
> record processor is how I would consume from multiple topics and ensure the
> correct Avro schema is used to deserialise the message? We have 1:1 mapping
> of schemas to topics. At the moment the ConsumeKafka processor is reading
> from all topics in one consumer. I’m assuming the attribute kafka.topic
> attribute doesn’t exist at this stage? We use the Avro Schema Registry
> Controller as we don’t have a schema registry in place yet.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Joe Witt [mailto:joe.witt@gmail.com]
> *Sent:* 23 September 2020 17:33
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Nathan
>
>
>
> You have plenty powerful machines to hit super high speeds but what I
> cannot tell is how the disks are setup/capability and layout wise and
> relative to our three repos of importance.  You'll need to share those
> details.
>
>
>
> That said, the design of the flow matters.  The Kafka processors that
> aren't record oriented will perform poorly unless they're acquiring data in
> their natural batches as they arrive from kafka.  In short, use the record
> oriented processors from Kafka.  In it you can even deal with the fact you
> want to go from AVRO to Json and so on.  These processors have a tougher
> learning curve but they perform extremely well and we have powerful
> processors to go along with them for common patterns.
>
>
>
> You absolutely should be able to get to the big numbers you have seen.  It
> requires great flow design (powerful machines are secondary).
>
>
>
> Thanks
>
>
>
> On Wed, Sep 23, 2020 at 9:26 AM <na...@bt.com> wrote:
>
> Hi All,
>
>
>
> We’ve got a NiFi 3 Node Cluster running on 3 x 40 CPU, 256GB RAM (32G Java
> Heap) servers. However, we have only been able to achieve a consumption of
> ~9.48GB Consumption Compressed (38.53GB Uncompressed) over 5 minutes, with
> a production rate of ~16.84GB out of the cluster over  5 mins. This is much
> lower than we were expecting based on what we have read. With this
> throughput we see a CPU load ~32 on all nodes, so we know there isn’t much
> else we can get out of the CPU).
>
>
>
> We have also tried SSDs, Raided and Unraided HDDs for the content repo
> storage, but they haven’t made a difference to the amount we can process.
>
>
>
> The process is as follows:
>
> 1.       Our flow reads from Kafka Compressed (Maximum of 2000 records
> per file). It then converts them from Avro to JSON. (ConsumeKafka_0_10 à
> UpdateAttribute à ConvertRecord)
>
> 2.       Depending on which topic the flow file is consumed from, we then
> send the message to one of 10 potential process groups, each containing
> between 3 and 5 processors within the process groups. (RouteOnAttribute à Relevant
> Processing Group containing JoltTransformJSON and several custom processors
> we have made).
>
> 3.       Finally, we produce the flow file content back to one of several
> Kafka topics, based on the input topic name in Avro format with Snappy
> compression on the Kafka topic.
>
>
>
> Inspecting the queued message counts, it indicates that the Jolt
> Transforms are taking the time to process (Large queues before JOLT
> processors, small or no queues afterwards). But I’m not sure why this is
> any worse than the rest of the processors as the event duration is less
> than a second when inspecting in provenance? We have tuned the number of
> concurrent tasks, duration and schedules to get the performance we have so
> far.
>
>
>
> I’m not sure if there is anything anyone could recommend or suggest to try
> and make improvements? We need to achieve a rate around 5x of what it’s
> currently processing with the same number of nodes. We are running out of
> ideas on how to accomplish this and may have to consider alternatives.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
>

RE: NiFi V1.9.2 Performance

Posted by na...@bt.com.
Hi All,

We’ve continued with testing during the week and went back to a cluster without SSDs, and reset back to where we were before sending this email out (No Record based Processors).

Interestingly we think we may have found our issue with JOLT. We haven’t thoroughly tested this yet, but with one of the transformations out of the ten, we removed the default operation from the Jolt Spec in the JoltTransformJSON processor. Based on a post we found on the Cloudera forums [1]. Any backlog in the queues we once saw on the Jolt transform has gone. It’s gone from what was roughly half a second to  131ms, and this surprised us. Am I correct in assuming this is a JOLT issue rather than a NiFi processor issue?

Unfortunately, we’ve got to put NiFi testing off to one side for the next few weeks. However, during this time we should be making the jump to v1.11.4 of NiFi as part of Cloudera installs.

Once we have tested this further, I will share any results or steps taken to improve the throughput.

Thanks for all the assistance!

Nathan

[1] - https://community.cloudera.com/t5/Support-Questions/JoltTransformJSON-performance-issue-when-using-default/td-p/226827

From: English,N,Nathan,VIR R
Sent: 28 September 2020 14:12
To: users@nifi.apache.org
Subject: RE: NiFi V1.9.2 Performance

Hi Eric,

Thanks for your suggestion. I have tried the volatile provenance repo today, and it did seem to make the cluster very unstable. I also had to increase the heap to 128G to make it even remotely testable. CPU load was above 40 on all nodes at the same volume of traffic as discussed beforehand.

Before that, I had switched out all Kafka Consumers to the Kafka Record Consumer processors (1 per topic). I have also replaced all the Jolt Transform processors with the Jolt Transform Record producers. Performance and CPU load remained relatively consistent (~32 CPU load) compared to the previous testing when using the standard processors (without Volatile Prov Repo). The bottlenecks still seem to be on Jolt, but I’m reluctant to up any of the concurrent tasks any higher as we are so high on CPU load.

I’m going to spend the afternoon comparing the nifi.properties files we have on the nodes to the default to see if there is anything obvious that would cause the high CPU.

Kind Regards,

Nathan

From: Eric O'Reilly [mailto:eric.oreilly@onyxpoint.com]
Sent: 28 September 2020 04:27
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Nathan,

Hopefully, this finds your issues resolved. But if not, I do have another direction to investigate. In my experience, it's not enough to just move your content repo to SSD. Generally, I only see a gain if both the provenance repo and the content repo are on the same speed disk. But I might be biased as my flows take in high volumes and have a high number of provenance inserts to process. You can test this (if your comfortable not recording your provenance for a while), by switching the implementation method of your provenance repository to volatile (see here for more details - https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#provenance-repository<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fhtml%2Fadministration-guide.html%23provenance-repository&data=02%7C01%7Cnathan.english%40bt.com%7C62076bae7dc84583966508d8635e6ce0%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637368605539054991&sdata=cS8GMKQLK5tBWj32bVoPWiLFE0VTQiqQnnqUB0u7Fxw%3D&reserved=0>). It basically just keeps a set number of records in memory (see here on how to configure - https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#volatile-provenance-repository-properties<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fhtml%2Fadministration-guide.html%23volatile-provenance-repository-properties&data=02%7C01%7Cnathan.english%40bt.com%7C62076bae7dc84583966508d8635e6ce0%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637368605539064976&sdata=FRYBooTQxHLyxsTLLTzbED5KR%2BaLFb0gOM%2BvkJaKunE%3D&reserved=0>), which removes any potential disk bottlenecks for the prov repo. I wouldn't want to run a production cluster like this and keep in mind it will impact your heap space (especially if you make the number of buffered records high), but it would be a starting point to determine if you're bottleneck is related to disk IO for the provenance repository storage.

v/r,

Eric O'Reilly

On Fri, Sep 25, 2020 at 7:10 AM <na...@bt.com>> wrote:
Hi Matt,

We’ve now switched to the JoltTransformRecord processors. It does seem to be slightly better performing.

We are trying to switch out the ConsumeKafka_0_10 and ConvertRecord processors to the ConsumeKafkaRecord_0_10 processor based on feedback in this chain as well.

With the ConvertRecord processor, we used the kafka.topic attribute as it was available because the ConsumeKafka processor had completed on the flow file. The ConsumeKafkaRecord handles it all in one. When testing, it failed to parse as kafka.topic is not set.

As we are consuming from multiple topics each with a unique Avro schema the only way I can see to do this would be to have a separate processor and AvroReader controller service for each topic?

Kind Regards,

Nathan


From: Matt Burgess [mailto:mattyb149@apache.org<ma...@apache.org>]
Sent: 24 September 2020 17:08
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Nathan,

If you have multiple JSON messages in one flow file, is it in one large array, or a top-level JSON object with an array inside? Also are you trying to transform each message or the whole thing (i.e. do you need to know about more than one message at a time)? If you have a top-level array and are transforming each element in the array, you might get better performance out of JoltTransformRecord rather than JoltTransformJSON, as the latter reads the entire file into memory as a single JSON entity. If you have a top-level object then they will both read the whole thing in.

Regards,
Matt


On Thu, Sep 24, 2020 at 10:25 AM <na...@bt.com>> wrote:
Hi Mark,

From what I can see (based on queues building before the processor and basically to empty after) it is the Jolt processor we have problems with. We’ve have tried to add more concurrency, reduce the run schedule and increasing the duration, but it didn’t seem to resolve the high CPU load (~32 when processing at the rates described in my first email, when no traffic is processing it sits at 0.2).

It could be the completely wrong way of diagnosing this! I’ve struggled to find information (Apart from your great videos) to assist in getting to the bottom of it.

Kind Regards,

Nathan

From: Mark Payne [mailto:markap14@hotmail.com<ma...@hotmail.com>]
Sent: 24 September 2020 15:12
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Hey Nathan,

A quick clarification - is the bottleneck / the slow point in your flow actually consuming from Kafka or Jolt? From your original message it sounds like the bottleneck may actually be the Jolt Transform processor?

If the problem is in the ConsumeKafka processor, one thing you’ll want to look at is in the Settings tab, set the Yield Duration to “0 sec”. That can make a huge difference in performance from Kafka processors.

Thanks
-Mark


On Sep 24, 2020, at 10:07 AM, nathan.english@bt.com<ma...@bt.com> wrote:

Hi Bryan,

Thanks for this. My understanding of the concurrent tasks was incorrect. I thought it was across the whole cluster, not per node.

I did spend some time looking at the code for the demarcator as we had issues getting it batching. I think there may be a slight misunderstanding between my description and how it sounds.

When I say an Empty string, the message demarcator isn’t blank. I have used the checkbox ‘Set Empty String’, which means the processor treats the field as Null (From memory). If I left the field empty (checkbox not selected), it was one Kafka message to one flow file, which was a massive bottleneck.

I also seem to remember from when I looked at the code. The ConsumeKafkaRecord processors defaults the demarcator to null.

Kind Regards,

Nathan

From: Bryan Bende [mailto:bbende@gmail.com]
Sent: 24 September 2020 14:54
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Regarding the batching, I would have to double check the code, but since you said the demarcator is empty string, I think that means it is not batching and putting one message to one flow file. Basically if a demarcator is not set then batch size is ignored.

Regarding the processors/tasks, lets take one topic with 11 partitions as an example, if you make a consumer processor for this topic with 1 concurrent task, then you have 3 instances of this processor since you have a 3 node cluster, so you might end up with something like this...

node 1 - ConsumeKafka
  Task 1 - 4 partitions

node 2 - ConsumeKafka
  Task 1 - 4 partitions

node 3 - ConsumeKafka
  Task 1 - 3 partitions

It may not be exactly like that, but just an example as to how it should be assigned.

To add more parallelism you could then increase concurrent tasks up to maybe 4 and you get something like this...

node 1 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - 1 partition

node 2 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - 1 partition

node 3 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - nothing

If you go higher than 4 concurrent tasks you will just end up creating more consumers than partitions, and there is nothing to assign them.


On Thu, Sep 24, 2020 at 9:30 AM <na...@bt.com>> wrote:
Hi Bryan,

We have configured the processor to read in a maximum batch size of 2k messages, which does seem to have one than more Kafka message in the flow file.

Completely understand on the Load balancing, we tried several iterations of 1 task to one topic partition. However, we still found it to be loaded towards one specific node. I will try splitting it into multiple processors to see if this handles it any better. We have 10 topics with 11 partitions. (one topic with 2 partitions). So I should set all concurrent tasks to 1 with multiple processors (One processor per topic)?

Kind Regards,

Nathan
From: Bryan Bende [mailto:bbende@gmail.com<ma...@gmail.com>]
Sent: 24 September 2020 13:59
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

I'm not sure how many topics you have, but the biggest improvement would probably be to not do a single message per flow file. You want to batch together lots of messages coming out of ConsumeKafka using a demarcator, and then convert/transform them in batches. You may need to have a separate consumer processor for each topic in order to do this correctly.

You generally shouldn't need to use a load balanced connection after ConsumeKafka. If one node is being favored it is likely that the number of partitions in your topic is not lining up with the # of nifi nodes X # of concurrent consumer tasks. In the simplest case, if your topic had one partition, and you have 3 nifi nodes with Consumekafka with 1 concurrent task each, then all your messages will only get consumed on one of the nodes. If you have 3 partitions then it should be roughly equal. In your case you have 3 nodes with concurrent tasks set to 12, so you have potentially 36 consumers, which means if you have anything less than 36 partitions then it is not going to be balanced equally.

On Thu, Sep 24, 2020 at 3:54 AM <na...@bt.com>> wrote:
Hi Joe,

The Raids seem to give us a good IOPS number when we’ve tried testing them. We have seen a 300ms wait time on the Content Repo, hence why we have tried SSDs for the content repo as we assumed that was the bottleneck. The other Raids seemed OK to us.

I will certainly look into the ConsumeKafkaRecord processor today and will come up with a solution to use it. I will feedback on what I find.

In regards to our current flow configuration, we have the following high-level process groups:

•         ConsumeData -  Consumes the data from Kafka, Add some additional attributes, Converts to JSON from Avro

•         TransfromData – This is where the majority of the work happens, it gets routed on an attribute to a set sub-process group based on the type of data it is (this is decided based on the kafka.topic attribute) where the processing happens (Explained in more detail below)

•         ProduceData – Publishes the Record back to Kafka using the PublishKafkaRecord processor, add some attributes to calculate processing duration it took to process and logs this out.

•         Error Handling – A General Catch all. Luckily we don’t see this triggered very often, but it logs a message and writes the file to disk.
<image001.png>

ConsumeData process group:

•         ConsumeKafka, as mentioned in earlier emails, consumes the messages from multiple Kafka Topics with a Message Demarcator of an empty string and the Max poll Records set at 2,000.

•         The Consumed Flows queue uses RoundRobin load balancing as we have found one node is favoured for the consumption from Kafka so wanted to distribute the load.

•         AddAttributes uses the UpdateAttribute to add additional information to the flow file (Data Type, Output Kafka Topic) based on the Kafka.topic attribute on the flow file.

•         Finally, Convert Record converts the content of the flow file from Avro to JSON using the AvroRead and JsonRecordSetWriter Controller Services. In Additional we use the inbuilt AvroSchemaRegistry.
<image002.png>

TransformData process group. Due to the size, it’s not easy to show in a screenshot.

•         Flow files are routed based on attribute to 1 of 10 processing groups (based on the kafka.topic attribute)

•         Each Data Type has its own processing group which is I have gone into detail on one below

•         There are two failure output ports one for the initial routing issues, another for any errors that have happened in the sub-processing groups. There is also the Succesful output port.

<image003.png>


HTTP Data Type Sub-processing group (Inside TransformData):

•         The sub-processing groups all follow a somewhat similar layout but have different custom processors after the initial Jolt Transformation

•         The Jolt Transformation does the majority of the heavy lifting in converting the message into the right format to match the output Avro schema requirements. The Jolt Spec contains a chain of operations including modify-overwrite-beta, shift, and default

•         Each processor after the Jolt transformation does something different to meet a specific requirement not possible with other Nifi processors. Some like the IPAddressProcessor are reused in processing for other data types. We are planning to move into a single post-processing group where all data types are routed, before being published to Kafka.
<image004.png>

Produce Data, produces the messages back to different Kafka topics which have been defined in the output.topic (set in the consume data stage)

•         PublishKafkaRecord converts the message from JSON back to Avro (Different schema to input) using the JSONTreeReader and AvroRecordSetWriter Controller services. We have set it to Guarantee Single Node Delivery, and use Snappy Compression.

•         Set Exit Attributes adds a Final Processing time to the flow file

•         The Calculate duration uses this to work out its overall time processing

•         Then it’s finally logged to file so we can analyse the full processing time.
<image005.png>

Below is the scheduling settings we are using for processors:
Group
Processor
Concurrent
Tasks
Run
Duration
Yield
Duration
Other
Consume Data
ConsumeKafka
12
0
1 mS
Message Demarcator = "Empty string set"
Max Poll Records = 2,000
AddAttributes
10
0
1 S
ConvertRecord
10
0
1 S
TransformData
RouteOnAttribute
10
0
1 S
HTTP Jolt
5
0
200 mS
HTTP Post Processor
2
0
1 S
Produce Data
PublishKafka
10
0
1 S
Max Request Size = 1 MB
Compression Type = snappy
batch.size = 80,000 (Based on some custom producers we have written in the past)
linger.ms<https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Flinger.ms%2F&data=02%7C01%7Cnathan.english%40bt.com%7C62076bae7dc84583966508d8635e6ce0%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637368605539064976&sdata=JYPnHBdViRsRmfOWp5gKoOsOwZNB2yeFVMuRZVFerV4%3D&reserved=0> = 50,000 (Based on some custom producers we have written in the past)
Set Exit Attributes
10
0
1 S
Calculate Duration
10
0
1 S
Log Success
10
0
1 S

The Queue configs are as follows:

Queue
Back Pressure
Object Threshold
Back Pressure
Size Threshold
Load Balance
After ConsumeKafka
20,000
1 GB
Round Robin
After Route On Attribute
All Other Queues In TransformData
10,000
1 GB
No
All Failure Routes
10,000
1 GB
No
Between ConsumerData And TransformData
Between TransformData And ProduceData
All Other Queues In ConsumeData
Before Route On Attribute
All Queues In ProduceData
20,000
1 GB
No

We also have a Maximum Timer Driven Thread Count of 200 set.

Stateless NiFi would fit this perfectly, but from what I have read, it’s not available in v1.9.2? We are stuck on 1.9.2 as we are using the Zookeeper provided as part of a Cloudera cluster. However, I’m wondering if we would be better suited to the volatile repositories, than writing to disk?

Thanks again for the advice so far Joe, you’ve given us some confidence it’s us doing something wrong and not something with NiFi.

Kind Regards,

Nathan

From: Joe Witt [mailto:joe.witt@gmail.com<ma...@gmail.com>]
Sent: 23 September 2020 19:05
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Nathan

Not sure what read/write rates you'll get in these RAID-10 configs but generally this seems like it should be fine (100s of MB/sec per node range at least).  Whereas now you're seeing about 20MB/sec/node.  This is definitely very low.

If you review http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.12.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/index.html<https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fcomponents%2Forg.apache.nifi%2Fnifi-kafka-2-6-nar%2F1.12.0%2Forg.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6%2Findex.html&data=02%7C01%7Cnathan.english%40bt.com%7C62076bae7dc84583966508d8635e6ce0%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637368605539074970&sdata=BJJ7xavZXS3QN4yKQ1OMqxasvtWxOaphK4cFE2vJ9S4%3D&reserved=0> then you'll see that we do actually capture attributes such as kafka.topic and so on.  Flowfiles would also be properly grouped by that.  What I'm not positive of is it could handle reading from multiple topics at the same time while also honoring and determining each of their distinct schemas.  Would need to test/verify that scenario to be sure.  If you do have a bunch of topics and they could grow/change then keeping this single processor approach makes sense.  If you can go the route of one ConsumeKafkaRecord per topic then obviously that would work well.

Not seeing your flow though I cannot be certain where the bottleneck(s) exist and provide guidance.  But this is without a doubt a vital skill to achieving maximum performance.

You'd have to show/share a ton more details for folks here to be helpful in walking through the full design.  Or explain the end to end flow.

As an additional food for thought if the flows are indeed 'from kafka -> do stuff -> back to kafka' this is likely a great use case for stateless-nifi.

Thanks

On Wed, Sep 23, 2020 at 10:43 AM <na...@bt.com>> wrote:
Hi Joe,

Thanks for getting back to me so quickly.

Our disk setup is as follows:
Path
Storage Type
Format
Capacity
Content
/
100GB OS SSD
ext4
89.9GB
OS, NiFi install, Logs
/data/1/
2 x 4TB SAS Hard Drives in RAID 1
ext4
3.7TB
Database and Flowfile Repos
/data/2/
8 x 4TB SAS Hard Drives in RAID 10
ext4
14.6TB
Content Repo
/data/3/
2 x 4TB SAS Hard Drives in RAID 1
ext4
3.7TB
Provence Repo
/ssd
1 x 4TB PCIe NVMe SSD
ext4
3.7TB
Content Repo (Used instead of /data/2/ as a test), to see if CPU was bottlenecked by Disk operations.

I will certainly take a look at those. One question with the consume record processor is how I would consume from multiple topics and ensure the correct Avro schema is used to deserialise the message? We have 1:1 mapping of schemas to topics. At the moment the ConsumeKafka processor is reading from all topics in one consumer. I’m assuming the attribute kafka.topic attribute doesn’t exist at this stage? We use the Avro Schema Registry Controller as we don’t have a schema registry in place yet.

Kind Regards,

Nathan

From: Joe Witt [mailto:joe.witt@gmail.com<ma...@gmail.com>]
Sent: 23 September 2020 17:33
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Nathan

You have plenty powerful machines to hit super high speeds but what I cannot tell is how the disks are setup/capability and layout wise and relative to our three repos of importance.  You'll need to share those details.

That said, the design of the flow matters.  The Kafka processors that aren't record oriented will perform poorly unless they're acquiring data in their natural batches as they arrive from kafka.  In short, use the record oriented processors from Kafka.  In it you can even deal with the fact you want to go from AVRO to Json and so on.  These processors have a tougher learning curve but they perform extremely well and we have powerful processors to go along with them for common patterns.

You absolutely should be able to get to the big numbers you have seen.  It requires great flow design (powerful machines are secondary).

Thanks

On Wed, Sep 23, 2020 at 9:26 AM <na...@bt.com>> wrote:
Hi All,

We’ve got a NiFi 3 Node Cluster running on 3 x 40 CPU, 256GB RAM (32G Java Heap) servers. However, we have only been able to achieve a consumption of ~9.48GB Consumption Compressed (38.53GB Uncompressed) over 5 minutes, with a production rate of ~16.84GB out of the cluster over  5 mins. This is much lower than we were expecting based on what we have read. With this throughput we see a CPU load ~32 on all nodes, so we know there isn’t much else we can get out of the CPU).

We have also tried SSDs, Raided and Unraided HDDs for the content repo storage, but they haven’t made a difference to the amount we can process.

The process is as follows:

1.       Our flow reads from Kafka Compressed (Maximum of 2000 records per file). It then converts them from Avro to JSON. (ConsumeKafka_0_10 --> UpdateAttribute --> ConvertRecord)

2.       Depending on which topic the flow file is consumed from, we then send the message to one of 10 potential process groups, each containing between 3 and 5 processors within the process groups. (RouteOnAttribute --> Relevant Processing Group containing JoltTransformJSON and several custom processors we have made).

3.       Finally, we produce the flow file content back to one of several Kafka topics, based on the input topic name in Avro format with Snappy compression on the Kafka topic.

Inspecting the queued message counts, it indicates that the Jolt Transforms are taking the time to process (Large queues before JOLT processors, small or no queues afterwards). But I’m not sure why this is any worse than the rest of the processors as the event duration is less than a second when inspecting in provenance? We have tuned the number of concurrent tasks, duration and schedules to get the performance we have so far.

I’m not sure if there is anything anyone could recommend or suggest to try and make improvements? We need to achieve a rate around 5x of what it’s currently processing with the same number of nodes. We are running out of ideas on how to accomplish this and may have to consider alternatives.

Kind Regards,

Nathan


RE: NiFi V1.9.2 Performance

Posted by na...@bt.com.
Hi Eric,

Thanks for your suggestion. I have tried the volatile provenance repo today, and it did seem to make the cluster very unstable. I also had to increase the heap to 128G to make it even remotely testable. CPU load was above 40 on all nodes at the same volume of traffic as discussed beforehand.

Before that, I had switched out all Kafka Consumers to the Kafka Record Consumer processors (1 per topic). I have also replaced all the Jolt Transform processors with the Jolt Transform Record producers. Performance and CPU load remained relatively consistent (~32 CPU load) compared to the previous testing when using the standard processors (without Volatile Prov Repo). The bottlenecks still seem to be on Jolt, but I’m reluctant to up any of the concurrent tasks any higher as we are so high on CPU load.

I’m going to spend the afternoon comparing the nifi.properties files we have on the nodes to the default to see if there is anything obvious that would cause the high CPU.

Kind Regards,

Nathan

From: Eric O'Reilly [mailto:eric.oreilly@onyxpoint.com]
Sent: 28 September 2020 04:27
To: users@nifi.apache.org
Subject: Re: NiFi V1.9.2 Performance

Nathan,

Hopefully, this finds your issues resolved. But if not, I do have another direction to investigate. In my experience, it's not enough to just move your content repo to SSD. Generally, I only see a gain if both the provenance repo and the content repo are on the same speed disk. But I might be biased as my flows take in high volumes and have a high number of provenance inserts to process. You can test this (if your comfortable not recording your provenance for a while), by switching the implementation method of your provenance repository to volatile (see here for more details - https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#provenance-repository<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fhtml%2Fadministration-guide.html%23provenance-repository&data=02%7C01%7Cnathan.english%40bt.com%7C62076bae7dc84583966508d8635e6ce0%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637368605539054991&sdata=cS8GMKQLK5tBWj32bVoPWiLFE0VTQiqQnnqUB0u7Fxw%3D&reserved=0>). It basically just keeps a set number of records in memory (see here on how to configure - https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#volatile-provenance-repository-properties<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fhtml%2Fadministration-guide.html%23volatile-provenance-repository-properties&data=02%7C01%7Cnathan.english%40bt.com%7C62076bae7dc84583966508d8635e6ce0%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637368605539064976&sdata=FRYBooTQxHLyxsTLLTzbED5KR%2BaLFb0gOM%2BvkJaKunE%3D&reserved=0>), which removes any potential disk bottlenecks for the prov repo. I wouldn't want to run a production cluster like this and keep in mind it will impact your heap space (especially if you make the number of buffered records high), but it would be a starting point to determine if you're bottleneck is related to disk IO for the provenance repository storage.

v/r,

Eric O'Reilly

On Fri, Sep 25, 2020 at 7:10 AM <na...@bt.com>> wrote:
Hi Matt,

We’ve now switched to the JoltTransformRecord processors. It does seem to be slightly better performing.

We are trying to switch out the ConsumeKafka_0_10 and ConvertRecord processors to the ConsumeKafkaRecord_0_10 processor based on feedback in this chain as well.

With the ConvertRecord processor, we used the kafka.topic attribute as it was available because the ConsumeKafka processor had completed on the flow file. The ConsumeKafkaRecord handles it all in one. When testing, it failed to parse as kafka.topic is not set.

As we are consuming from multiple topics each with a unique Avro schema the only way I can see to do this would be to have a separate processor and AvroReader controller service for each topic?

Kind Regards,

Nathan


From: Matt Burgess [mailto:mattyb149@apache.org<ma...@apache.org>]
Sent: 24 September 2020 17:08
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Nathan,

If you have multiple JSON messages in one flow file, is it in one large array, or a top-level JSON object with an array inside? Also are you trying to transform each message or the whole thing (i.e. do you need to know about more than one message at a time)? If you have a top-level array and are transforming each element in the array, you might get better performance out of JoltTransformRecord rather than JoltTransformJSON, as the latter reads the entire file into memory as a single JSON entity. If you have a top-level object then they will both read the whole thing in.

Regards,
Matt


On Thu, Sep 24, 2020 at 10:25 AM <na...@bt.com>> wrote:
Hi Mark,

From what I can see (based on queues building before the processor and basically to empty after) it is the Jolt processor we have problems with. We’ve have tried to add more concurrency, reduce the run schedule and increasing the duration, but it didn’t seem to resolve the high CPU load (~32 when processing at the rates described in my first email, when no traffic is processing it sits at 0.2).

It could be the completely wrong way of diagnosing this! I’ve struggled to find information (Apart from your great videos) to assist in getting to the bottom of it.

Kind Regards,

Nathan

From: Mark Payne [mailto:markap14@hotmail.com<ma...@hotmail.com>]
Sent: 24 September 2020 15:12
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Hey Nathan,

A quick clarification - is the bottleneck / the slow point in your flow actually consuming from Kafka or Jolt? From your original message it sounds like the bottleneck may actually be the Jolt Transform processor?

If the problem is in the ConsumeKafka processor, one thing you’ll want to look at is in the Settings tab, set the Yield Duration to “0 sec”. That can make a huge difference in performance from Kafka processors.

Thanks
-Mark


On Sep 24, 2020, at 10:07 AM, nathan.english@bt.com<ma...@bt.com> wrote:

Hi Bryan,

Thanks for this. My understanding of the concurrent tasks was incorrect. I thought it was across the whole cluster, not per node.

I did spend some time looking at the code for the demarcator as we had issues getting it batching. I think there may be a slight misunderstanding between my description and how it sounds.

When I say an Empty string, the message demarcator isn’t blank. I have used the checkbox ‘Set Empty String’, which means the processor treats the field as Null (From memory). If I left the field empty (checkbox not selected), it was one Kafka message to one flow file, which was a massive bottleneck.

I also seem to remember from when I looked at the code. The ConsumeKafkaRecord processors defaults the demarcator to null.

Kind Regards,

Nathan

From: Bryan Bende [mailto:bbende@gmail.com]
Sent: 24 September 2020 14:54
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Regarding the batching, I would have to double check the code, but since you said the demarcator is empty string, I think that means it is not batching and putting one message to one flow file. Basically if a demarcator is not set then batch size is ignored.

Regarding the processors/tasks, lets take one topic with 11 partitions as an example, if you make a consumer processor for this topic with 1 concurrent task, then you have 3 instances of this processor since you have a 3 node cluster, so you might end up with something like this...

node 1 - ConsumeKafka
  Task 1 - 4 partitions

node 2 - ConsumeKafka
  Task 1 - 4 partitions

node 3 - ConsumeKafka
  Task 1 - 3 partitions

It may not be exactly like that, but just an example as to how it should be assigned.

To add more parallelism you could then increase concurrent tasks up to maybe 4 and you get something like this...

node 1 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - 1 partition

node 2 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - 1 partition

node 3 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - nothing

If you go higher than 4 concurrent tasks you will just end up creating more consumers than partitions, and there is nothing to assign them.


On Thu, Sep 24, 2020 at 9:30 AM <na...@bt.com>> wrote:
Hi Bryan,

We have configured the processor to read in a maximum batch size of 2k messages, which does seem to have one than more Kafka message in the flow file.

Completely understand on the Load balancing, we tried several iterations of 1 task to one topic partition. However, we still found it to be loaded towards one specific node. I will try splitting it into multiple processors to see if this handles it any better. We have 10 topics with 11 partitions. (one topic with 2 partitions). So I should set all concurrent tasks to 1 with multiple processors (One processor per topic)?

Kind Regards,

Nathan
From: Bryan Bende [mailto:bbende@gmail.com<ma...@gmail.com>]
Sent: 24 September 2020 13:59
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

I'm not sure how many topics you have, but the biggest improvement would probably be to not do a single message per flow file. You want to batch together lots of messages coming out of ConsumeKafka using a demarcator, and then convert/transform them in batches. You may need to have a separate consumer processor for each topic in order to do this correctly.

You generally shouldn't need to use a load balanced connection after ConsumeKafka. If one node is being favored it is likely that the number of partitions in your topic is not lining up with the # of nifi nodes X # of concurrent consumer tasks. In the simplest case, if your topic had one partition, and you have 3 nifi nodes with Consumekafka with 1 concurrent task each, then all your messages will only get consumed on one of the nodes. If you have 3 partitions then it should be roughly equal. In your case you have 3 nodes with concurrent tasks set to 12, so you have potentially 36 consumers, which means if you have anything less than 36 partitions then it is not going to be balanced equally.

On Thu, Sep 24, 2020 at 3:54 AM <na...@bt.com>> wrote:
Hi Joe,

The Raids seem to give us a good IOPS number when we’ve tried testing them. We have seen a 300ms wait time on the Content Repo, hence why we have tried SSDs for the content repo as we assumed that was the bottleneck. The other Raids seemed OK to us.

I will certainly look into the ConsumeKafkaRecord processor today and will come up with a solution to use it. I will feedback on what I find.

In regards to our current flow configuration, we have the following high-level process groups:

•         ConsumeData -  Consumes the data from Kafka, Add some additional attributes, Converts to JSON from Avro

•         TransfromData – This is where the majority of the work happens, it gets routed on an attribute to a set sub-process group based on the type of data it is (this is decided based on the kafka.topic attribute) where the processing happens (Explained in more detail below)

•         ProduceData – Publishes the Record back to Kafka using the PublishKafkaRecord processor, add some attributes to calculate processing duration it took to process and logs this out.

•         Error Handling – A General Catch all. Luckily we don’t see this triggered very often, but it logs a message and writes the file to disk.
<image001.png>

ConsumeData process group:

•         ConsumeKafka, as mentioned in earlier emails, consumes the messages from multiple Kafka Topics with a Message Demarcator of an empty string and the Max poll Records set at 2,000.

•         The Consumed Flows queue uses RoundRobin load balancing as we have found one node is favoured for the consumption from Kafka so wanted to distribute the load.

•         AddAttributes uses the UpdateAttribute to add additional information to the flow file (Data Type, Output Kafka Topic) based on the Kafka.topic attribute on the flow file.

•         Finally, Convert Record converts the content of the flow file from Avro to JSON using the AvroRead and JsonRecordSetWriter Controller Services. In Additional we use the inbuilt AvroSchemaRegistry.
<image002.png>

TransformData process group. Due to the size, it’s not easy to show in a screenshot.

•         Flow files are routed based on attribute to 1 of 10 processing groups (based on the kafka.topic attribute)

•         Each Data Type has its own processing group which is I have gone into detail on one below

•         There are two failure output ports one for the initial routing issues, another for any errors that have happened in the sub-processing groups. There is also the Succesful output port.

<image003.png>


HTTP Data Type Sub-processing group (Inside TransformData):

•         The sub-processing groups all follow a somewhat similar layout but have different custom processors after the initial Jolt Transformation

•         The Jolt Transformation does the majority of the heavy lifting in converting the message into the right format to match the output Avro schema requirements. The Jolt Spec contains a chain of operations including modify-overwrite-beta, shift, and default

•         Each processor after the Jolt transformation does something different to meet a specific requirement not possible with other Nifi processors. Some like the IPAddressProcessor are reused in processing for other data types. We are planning to move into a single post-processing group where all data types are routed, before being published to Kafka.
<image004.png>

Produce Data, produces the messages back to different Kafka topics which have been defined in the output.topic (set in the consume data stage)

•         PublishKafkaRecord converts the message from JSON back to Avro (Different schema to input) using the JSONTreeReader and AvroRecordSetWriter Controller services. We have set it to Guarantee Single Node Delivery, and use Snappy Compression.

•         Set Exit Attributes adds a Final Processing time to the flow file

•         The Calculate duration uses this to work out its overall time processing

•         Then it’s finally logged to file so we can analyse the full processing time.
<image005.png>

Below is the scheduling settings we are using for processors:
Group
Processor
Concurrent
Tasks
Run
Duration
Yield
Duration
Other
Consume Data
ConsumeKafka
12
0
1 mS
Message Demarcator = "Empty string set"
Max Poll Records = 2,000
AddAttributes
10
0
1 S
ConvertRecord
10
0
1 S
TransformData
RouteOnAttribute
10
0
1 S
HTTP Jolt
5
0
200 mS
HTTP Post Processor
2
0
1 S
Produce Data
PublishKafka
10
0
1 S
Max Request Size = 1 MB
Compression Type = snappy
batch.size = 80,000 (Based on some custom producers we have written in the past)
linger.ms<https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Flinger.ms%2F&data=02%7C01%7Cnathan.english%40bt.com%7C62076bae7dc84583966508d8635e6ce0%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637368605539064976&sdata=JYPnHBdViRsRmfOWp5gKoOsOwZNB2yeFVMuRZVFerV4%3D&reserved=0> = 50,000 (Based on some custom producers we have written in the past)
Set Exit Attributes
10
0
1 S
Calculate Duration
10
0
1 S
Log Success
10
0
1 S

The Queue configs are as follows:

Queue
Back Pressure
Object Threshold
Back Pressure
Size Threshold
Load Balance
After ConsumeKafka
20,000
1 GB
Round Robin
After Route On Attribute
All Other Queues In TransformData
10,000
1 GB
No
All Failure Routes
10,000
1 GB
No
Between ConsumerData And TransformData
Between TransformData And ProduceData
All Other Queues In ConsumeData
Before Route On Attribute
All Queues In ProduceData
20,000
1 GB
No

We also have a Maximum Timer Driven Thread Count of 200 set.

Stateless NiFi would fit this perfectly, but from what I have read, it’s not available in v1.9.2? We are stuck on 1.9.2 as we are using the Zookeeper provided as part of a Cloudera cluster. However, I’m wondering if we would be better suited to the volatile repositories, than writing to disk?

Thanks again for the advice so far Joe, you’ve given us some confidence it’s us doing something wrong and not something with NiFi.

Kind Regards,

Nathan

From: Joe Witt [mailto:joe.witt@gmail.com<ma...@gmail.com>]
Sent: 23 September 2020 19:05
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Nathan

Not sure what read/write rates you'll get in these RAID-10 configs but generally this seems like it should be fine (100s of MB/sec per node range at least).  Whereas now you're seeing about 20MB/sec/node.  This is definitely very low.

If you review http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.12.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/index.html<https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fcomponents%2Forg.apache.nifi%2Fnifi-kafka-2-6-nar%2F1.12.0%2Forg.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6%2Findex.html&data=02%7C01%7Cnathan.english%40bt.com%7C62076bae7dc84583966508d8635e6ce0%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637368605539074970&sdata=BJJ7xavZXS3QN4yKQ1OMqxasvtWxOaphK4cFE2vJ9S4%3D&reserved=0> then you'll see that we do actually capture attributes such as kafka.topic and so on.  Flowfiles would also be properly grouped by that.  What I'm not positive of is it could handle reading from multiple topics at the same time while also honoring and determining each of their distinct schemas.  Would need to test/verify that scenario to be sure.  If you do have a bunch of topics and they could grow/change then keeping this single processor approach makes sense.  If you can go the route of one ConsumeKafkaRecord per topic then obviously that would work well.

Not seeing your flow though I cannot be certain where the bottleneck(s) exist and provide guidance.  But this is without a doubt a vital skill to achieving maximum performance.

You'd have to show/share a ton more details for folks here to be helpful in walking through the full design.  Or explain the end to end flow.

As an additional food for thought if the flows are indeed 'from kafka -> do stuff -> back to kafka' this is likely a great use case for stateless-nifi.

Thanks

On Wed, Sep 23, 2020 at 10:43 AM <na...@bt.com>> wrote:
Hi Joe,

Thanks for getting back to me so quickly.

Our disk setup is as follows:
Path
Storage Type
Format
Capacity
Content
/
100GB OS SSD
ext4
89.9GB
OS, NiFi install, Logs
/data/1/
2 x 4TB SAS Hard Drives in RAID 1
ext4
3.7TB
Database and Flowfile Repos
/data/2/
8 x 4TB SAS Hard Drives in RAID 10
ext4
14.6TB
Content Repo
/data/3/
2 x 4TB SAS Hard Drives in RAID 1
ext4
3.7TB
Provence Repo
/ssd
1 x 4TB PCIe NVMe SSD
ext4
3.7TB
Content Repo (Used instead of /data/2/ as a test), to see if CPU was bottlenecked by Disk operations.

I will certainly take a look at those. One question with the consume record processor is how I would consume from multiple topics and ensure the correct Avro schema is used to deserialise the message? We have 1:1 mapping of schemas to topics. At the moment the ConsumeKafka processor is reading from all topics in one consumer. I’m assuming the attribute kafka.topic attribute doesn’t exist at this stage? We use the Avro Schema Registry Controller as we don’t have a schema registry in place yet.

Kind Regards,

Nathan

From: Joe Witt [mailto:joe.witt@gmail.com<ma...@gmail.com>]
Sent: 23 September 2020 17:33
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Nathan

You have plenty powerful machines to hit super high speeds but what I cannot tell is how the disks are setup/capability and layout wise and relative to our three repos of importance.  You'll need to share those details.

That said, the design of the flow matters.  The Kafka processors that aren't record oriented will perform poorly unless they're acquiring data in their natural batches as they arrive from kafka.  In short, use the record oriented processors from Kafka.  In it you can even deal with the fact you want to go from AVRO to Json and so on.  These processors have a tougher learning curve but they perform extremely well and we have powerful processors to go along with them for common patterns.

You absolutely should be able to get to the big numbers you have seen.  It requires great flow design (powerful machines are secondary).

Thanks

On Wed, Sep 23, 2020 at 9:26 AM <na...@bt.com>> wrote:
Hi All,

We’ve got a NiFi 3 Node Cluster running on 3 x 40 CPU, 256GB RAM (32G Java Heap) servers. However, we have only been able to achieve a consumption of ~9.48GB Consumption Compressed (38.53GB Uncompressed) over 5 minutes, with a production rate of ~16.84GB out of the cluster over  5 mins. This is much lower than we were expecting based on what we have read. With this throughput we see a CPU load ~32 on all nodes, so we know there isn’t much else we can get out of the CPU).

We have also tried SSDs, Raided and Unraided HDDs for the content repo storage, but they haven’t made a difference to the amount we can process.

The process is as follows:

1.       Our flow reads from Kafka Compressed (Maximum of 2000 records per file). It then converts them from Avro to JSON. (ConsumeKafka_0_10 --> UpdateAttribute --> ConvertRecord)

2.       Depending on which topic the flow file is consumed from, we then send the message to one of 10 potential process groups, each containing between 3 and 5 processors within the process groups. (RouteOnAttribute --> Relevant Processing Group containing JoltTransformJSON and several custom processors we have made).

3.       Finally, we produce the flow file content back to one of several Kafka topics, based on the input topic name in Avro format with Snappy compression on the Kafka topic.

Inspecting the queued message counts, it indicates that the Jolt Transforms are taking the time to process (Large queues before JOLT processors, small or no queues afterwards). But I’m not sure why this is any worse than the rest of the processors as the event duration is less than a second when inspecting in provenance? We have tuned the number of concurrent tasks, duration and schedules to get the performance we have so far.

I’m not sure if there is anything anyone could recommend or suggest to try and make improvements? We need to achieve a rate around 5x of what it’s currently processing with the same number of nodes. We are running out of ideas on how to accomplish this and may have to consider alternatives.

Kind Regards,

Nathan


Re: NiFi V1.9.2 Performance

Posted by Eric O'Reilly <er...@onyxpoint.com>.
Nathan,

Hopefully, this finds your issues resolved. But if not, I do have another
direction to investigate. In my experience, it's not enough to just move
your content repo to SSD. Generally, I only see a gain if both the
provenance repo and the content repo are on the same speed disk. But I
might be biased as my flows take in high volumes and have a high number of
provenance inserts to process. You can test this (if your comfortable not
recording your provenance for a while), by switching the
implementation method of your provenance repository to volatile (see here
for more details -
https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#provenance-repository).
It basically just keeps a set number of records in memory (see here on how
to configure -
https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#volatile-provenance-repository-properties),
which removes any potential disk bottlenecks for the prov repo. I wouldn't
want to run a production cluster like this and keep in mind it will impact
your heap space (especially if you make the number of buffered records
high), but it would be a starting point to determine if you're bottleneck
is related to disk IO for the provenance repository storage.

v/r,

Eric O'Reilly

On Fri, Sep 25, 2020 at 7:10 AM <na...@bt.com> wrote:

> Hi Matt,
>
>
>
> We’ve now switched to the JoltTransformRecord processors. It does seem to
> be slightly better performing.
>
>
>
> We are trying to switch out the ConsumeKafka_0_10 and ConvertRecord
> processors to the ConsumeKafkaRecord_0_10 processor based on feedback in
> this chain as well.
>
>
>
> With the ConvertRecord processor, we used the kafka.topic attribute as it
> was available because the ConsumeKafka processor had completed on the flow
> file. The ConsumeKafkaRecord handles it all in one. When testing, it failed
> to parse as kafka.topic is not set.
>
>
>
> As we are consuming from multiple topics each with a unique Avro schema
> the only way I can see to do this would be to have a separate processor and
> AvroReader controller service for each topic?
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
>
>
> *From:* Matt Burgess [mailto:mattyb149@apache.org]
> *Sent:* 24 September 2020 17:08
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Nathan,
>
>
>
> If you have multiple JSON messages in one flow file, is it in one large
> array, or a top-level JSON object with an array inside? Also are you trying
> to transform each message or the whole thing (i.e. do you need to know
> about more than one message at a time)? If you have a top-level array and
> are transforming each element in the array, you might get better
> performance out of JoltTransformRecord rather than JoltTransformJSON, as
> the latter reads the entire file into memory as a single JSON entity. If
> you have a top-level object then they will both read the whole thing in.
>
>
>
> Regards,
>
> Matt
>
>
>
>
>
> On Thu, Sep 24, 2020 at 10:25 AM <na...@bt.com> wrote:
>
> Hi Mark,
>
>
>
> From what I can see (based on queues building before the processor and
> basically to empty after) it is the Jolt processor we have problems with.
> We’ve have tried to add more concurrency, reduce the run schedule and
> increasing the duration, but it didn’t seem to resolve the high CPU load
> (~32 when processing at the rates described in my first email, when no
> traffic is processing it sits at 0.2).
>
>
>
> It could be the completely wrong way of diagnosing this! I’ve struggled to
> find information (Apart from your great videos) to assist in getting to the
> bottom of it.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Mark Payne [mailto:markap14@hotmail.com]
> *Sent:* 24 September 2020 15:12
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Hey Nathan,
>
>
>
> A quick clarification - is the bottleneck / the slow point in your flow
> actually consuming from Kafka or Jolt? From your original message it sounds
> like the bottleneck may actually be the Jolt Transform processor?
>
>
>
> If the problem is in the ConsumeKafka processor, one thing you’ll want to
> look at is in the Settings tab, set the Yield Duration to “0 sec”. That can
> make a huge difference in performance from Kafka processors.
>
>
>
> Thanks
>
> -Mark
>
>
>
>
>
> On Sep 24, 2020, at 10:07 AM, nathan.english@bt.com wrote:
>
>
>
> Hi Bryan,
>
>
>
> Thanks for this. My understanding of the concurrent tasks was incorrect. I
> thought it was across the whole cluster, not per node.
>
>
>
> I did spend some time looking at the code for the demarcator as we had
> issues getting it batching. I think there may be a slight misunderstanding
> between my description and how it sounds.
>
>
>
> When I say an Empty string, the message demarcator isn’t blank. I have
> used the checkbox ‘Set Empty String’, which means the processor treats the
> field as Null (From memory). If I left the field empty (checkbox not
> selected), it was one Kafka message to one flow file, which was a massive
> bottleneck.
>
>
>
> I also seem to remember from when I looked at the code. The
> ConsumeKafkaRecord processors defaults the demarcator to null.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Bryan Bende [mailto:bbende@gmail.com <bb...@gmail.com>]
> *Sent:* 24 September 2020 14:54
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Regarding the batching, I would have to double check the code, but since
> you said the demarcator is empty string, I think that means it is not
> batching and putting one message to one flow file. Basically if a
> demarcator is not set then batch size is ignored.
>
>
>
> Regarding the processors/tasks, lets take one topic with 11 partitions as
> an example, if you make a consumer processor for this topic with 1
> concurrent task, then you have 3 instances of this processor since you have
> a 3 node cluster, so you might end up with something like this...
>
>
>
> node 1 - ConsumeKafka
>
>   Task 1 - 4 partitions
>
>
>
> node 2 - ConsumeKafka
>
>   Task 1 - 4 partitions
>
>
>
> node 3 - ConsumeKafka
>
>   Task 1 - 3 partitions
>
>
>
> It may not be exactly like that, but just an example as to how it should
> be assigned.
>
>
>
> To add more parallelism you could then increase concurrent tasks up to
> maybe 4 and you get something like this...
>
>
>
> node 1 - ConsumeKafka
>
>   Task 1 - 1 partition
>
>   Task 2 - 1 partition
>
>   Task 3 - 1 partition
>
>   Task 4 - 1 partition
>
>
>
> node 2 - ConsumeKafka
>
>   Task 1 - 1 partition
>
>   Task 2 - 1 partition
>
>   Task 3 - 1 partition
>
>   Task 4 - 1 partition
>
>
>
> node 3 - ConsumeKafka
>
>   Task 1 - 1 partition
>
>   Task 2 - 1 partition
>
>   Task 3 - 1 partition
>
>   Task 4 - nothing
>
>
>
> If you go higher than 4 concurrent tasks you will just end up creating
> more consumers than partitions, and there is nothing to assign them.
>
>
>
>
>
> On Thu, Sep 24, 2020 at 9:30 AM <na...@bt.com> wrote:
>
> Hi Bryan,
>
>
>
> We have configured the processor to read in a maximum batch size of 2k
> messages, which does seem to have one than more Kafka message in the flow
> file.
>
>
>
> Completely understand on the Load balancing, we tried several iterations
> of 1 task to one topic partition. However, we still found it to be loaded
> towards one specific node. I will try splitting it into multiple processors
> to see if this handles it any better. We have 10 topics with 11 partitions.
> (one topic with 2 partitions). So I should set all concurrent tasks to 1
> with multiple processors (One processor per topic)?
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
> *From:* Bryan Bende [mailto:bbende@gmail.com]
> *Sent:* 24 September 2020 13:59
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> I'm not sure how many topics you have, but the biggest improvement would
> probably be to not do a single message per flow file. You want to batch
> together lots of messages coming out of ConsumeKafka using a demarcator,
> and then convert/transform them in batches. You may need to have a separate
> consumer processor for each topic in order to do this correctly.
>
>
>
> You generally shouldn't need to use a load balanced connection after
> ConsumeKafka. If one node is being favored it is likely that the number of
> partitions in your topic is not lining up with the # of nifi nodes X # of
> concurrent consumer tasks. In the simplest case, if your topic had one
> partition, and you have 3 nifi nodes with Consumekafka with 1 concurrent
> task each, then all your messages will only get consumed on one of the
> nodes. If you have 3 partitions then it should be roughly equal. In your
> case you have 3 nodes with concurrent tasks set to 12, so you have
> potentially 36 consumers, which means if you have anything less than 36
> partitions then it is not going to be balanced equally.
>
>
>
> On Thu, Sep 24, 2020 at 3:54 AM <na...@bt.com> wrote:
>
> Hi Joe,
>
>
>
> The Raids seem to give us a good IOPS number when we’ve tried testing
> them. We have seen a 300ms wait time on the Content Repo, hence why we have
> tried SSDs for the content repo as we assumed that was the bottleneck. The
> other Raids seemed OK to us.
>
>
>
> I will certainly look into the ConsumeKafkaRecord processor today and will
> come up with a solution to use it. I will feedback on what I find.
>
>
>
> In regards to our current flow configuration, we have the following
> high-level process groups:
>
> ·         ConsumeData -  Consumes the data from Kafka, Add some
> additional attributes, Converts to JSON from Avro
>
> ·         TransfromData – This is where the majority of the work happens,
> it gets routed on an attribute to a set sub-process group based on the type
> of data it is (this is decided based on the kafka.topic attribute) where
> the processing happens (Explained in more detail below)
>
> ·         ProduceData – Publishes the Record back to Kafka using the
> PublishKafkaRecord processor, add some attributes to calculate processing
> duration it took to process and logs this out.
>
> ·         Error Handling – A General Catch all. Luckily we don’t see this
> triggered very often, but it logs a message and writes the file to disk.
>
> <image001.png>
>
>
>
> *ConsumeData* process group:
>
> ·         ConsumeKafka, as mentioned in earlier emails, consumes the
> messages from multiple Kafka Topics with a Message Demarcator of an empty
> string and the Max poll Records set at 2,000.
>
> ·         The Consumed Flows queue uses RoundRobin load balancing as we
> have found one node is favoured for the consumption from Kafka so wanted to
> distribute the load.
>
> ·         AddAttributes uses the UpdateAttribute to add additional
> information to the flow file (Data Type, Output Kafka Topic) based on the
> Kafka.topic attribute on the flow file.
>
> ·         Finally, Convert Record converts the content of the flow file
> from Avro to JSON using the AvroRead and JsonRecordSetWriter Controller
> Services. In Additional we use the inbuilt AvroSchemaRegistry.
>
> <image002.png>
>
>
>
> *TransformData* process group. Due to the size, it’s not easy to show in
> a screenshot.
>
> ·         Flow files are routed based on attribute to 1 of 10 processing
> groups (based on the kafka.topic attribute)
>
> ·         Each Data Type has its own processing group which is I have
> gone into detail on one below
>
> ·         There are two failure output ports one for the initial routing
> issues, another for any errors that have happened in the sub-processing
> groups. There is also the Succesful output port.
>
>
>
> <image003.png>
>
>
>
>
>
> *HTTP Data Type Sub-processing group (Inside TransformData):*
>
> ·         The sub-processing groups all follow a somewhat similar layout
> but have different custom processors after the initial Jolt Transformation
>
> ·         The Jolt Transformation does the majority of the heavy lifting
> in converting the message into the right format to match the output Avro
> schema requirements. The Jolt Spec contains a chain of operations including
> modify-overwrite-beta, shift, and default
>
> ·         Each processor after the Jolt transformation does something
> different to meet a specific requirement not possible with other Nifi
> processors. Some like the IPAddressProcessor are reused in processing for
> other data types. We are planning to move into a single post-processing
> group where all data types are routed, before being published to Kafka.
>
> <image004.png>
>
>
>
> *Produce Data*, produces the messages back to different Kafka topics
> which have been defined in the output.topic (set in the consume data stage)
>
> ·         PublishKafkaRecord converts the message from JSON back to Avro
> (Different schema to input) using the JSONTreeReader and
> AvroRecordSetWriter Controller services. We have set it to Guarantee Single
> Node Delivery, and use Snappy Compression.
>
> ·         Set Exit Attributes adds a Final Processing time to the flow
> file
>
> ·         The Calculate duration uses this to work out its overall time
> processing
>
> ·         Then it’s finally logged to file so we can analyse the full
> processing time.
>
> <image005.png>
>
>
>
> Below is the scheduling settings we are using for processors:
>
> *Group*
>
> *Processor*
>
> *Concurrent*
>
> *Tasks*
>
> *Run*
>
> *Duration*
>
> *Yield*
>
> *Duration*
>
> *Other*
>
> Consume Data
>
> ConsumeKafka
>
> 12
>
> 0
>
> 1 mS
>
> Message Demarcator = "Empty string set"
>
> Max Poll Records = 2,000
>
> AddAttributes
>
> 10
>
> 0
>
> 1 S
>
> ConvertRecord
>
> 10
>
> 0
>
> 1 S
>
> TransformData
>
> RouteOnAttribute
>
> 10
>
> 0
>
> 1 S
>
> HTTP Jolt
>
> 5
>
> 0
>
> 200 mS
>
> HTTP Post Processor
>
> 2
>
> 0
>
> 1 S
>
> Produce Data
>
> PublishKafka
>
> 10
>
> 0
>
> 1 S
>
> Max Request Size = 1 MB
>
> Compression Type = snappy
>
> batch.size = 80,000 (Based on some custom producers we have written in the
> past)
>
> linger.ms
> <https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Flinger.ms%2F&data=02%7C01%7Cnathan.english%40bt.com%7Ce1a257a5405a4210d35408d860a41ae1%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365605231282190&sdata=DwONev1IuaRISFcSVpRlIzWMLPumsFwBXWZe7V%2BCHVM%3D&reserved=0> =
> 50,000 (Based on some custom producers we have written in the past)
>
> Set Exit Attributes
>
> 10
>
> 0
>
> 1 S
>
> Calculate Duration
>
> 10
>
> 0
>
> 1 S
>
> Log Success
>
> 10
>
> 0
>
> 1 S
>
>
>
> The Queue configs are as follows:
>
>
>
> *Queue*
>
>
> *Back Pressure Object Threshold*
>
>
> *Back Pressure Size Threshold*
>
> *Load Balance*
>
> After ConsumeKafka
>
> 20,000
>
> 1 GB
>
> Round Robin
>
> After Route On Attribute
>
> All Other Queues In TransformData
>
> 10,000
>
> 1 GB
>
> No
>
> All Failure Routes
>
> 10,000
>
> 1 GB
>
> No
>
> Between ConsumerData And TransformData
>
> Between TransformData And ProduceData
>
> All Other Queues In ConsumeData
>
> Before Route On Attribute
>
> All Queues In ProduceData
>
> 20,000
>
> 1 GB
>
> No
>
>
>
> We also have a Maximum Timer Driven Thread Count of 200 set.
>
>
>
> Stateless NiFi would fit this perfectly, but from what I have read, it’s
> not available in v1.9.2? We are stuck on 1.9.2 as we are using the
> Zookeeper provided as part of a Cloudera cluster. However, I’m wondering if
> we would be better suited to the volatile repositories, than writing to
> disk?
>
>
>
> Thanks again for the advice so far Joe, you’ve given us some confidence
> it’s us doing something wrong and not something with NiFi.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Joe Witt [mailto:joe.witt@gmail.com]
> *Sent:* 23 September 2020 19:05
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Nathan
>
>
>
> Not sure what read/write rates you'll get in these RAID-10 configs but
> generally this seems like it should be fine (100s of MB/sec per node range
> at least).  Whereas now you're seeing about 20MB/sec/node.  This is
> definitely very low.
>
>
>
> If you review
> http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.12.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/index.html
> <https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fcomponents%2Forg.apache.nifi%2Fnifi-kafka-2-6-nar%2F1.12.0%2Forg.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6%2Findex.html&data=02%7C01%7Cnathan.english%40bt.com%7Ce1a257a5405a4210d35408d860a41ae1%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365605231282190&sdata=M3rQp0EYnUrRvfElTawfttqKloYH3P6sHEZgVc4Fpgw%3D&reserved=0>
>  then you'll see that we do actually capture attributes such as
> kafka.topic and so on.  Flowfiles would also be properly grouped by that.
> What I'm not positive of is it could handle reading from multiple topics at
> the same time while also honoring and determining each of their distinct
> schemas.  Would need to test/verify that scenario to be sure.  If you do
> have a bunch of topics and they could grow/change then keeping this single
> processor approach makes sense.  If you can go the route of one
> ConsumeKafkaRecord per topic then obviously that would work well.
>
>
>
> Not seeing your flow though I cannot be certain where the bottleneck(s)
> exist and provide guidance.  But this is without a doubt a vital skill to
> achieving maximum performance.
>
>
>
> You'd have to show/share a ton more details for folks here to be helpful
> in walking through the full design.  Or explain the end to end flow.
>
>
>
> As an additional food for thought if the flows are indeed 'from kafka ->
> do stuff -> back to kafka' this is likely a great use case for
> stateless-nifi.
>
>
>
> Thanks
>
>
>
> On Wed, Sep 23, 2020 at 10:43 AM <na...@bt.com> wrote:
>
> Hi Joe,
>
>
>
> Thanks for getting back to me so quickly.
>
>
>
> Our disk setup is as follows:
>
> Path
>
> Storage Type
>
> Format
>
> Capacity
>
> Content
>
> /
>
> 100GB OS SSD
>
> ext4
>
> 89.9GB
>
> OS, NiFi install, Logs
>
> /data/1/
>
> 2 x 4TB SAS Hard Drives in RAID 1
>
> ext4
>
> 3.7TB
>
> Database and Flowfile Repos
>
> /data/2/
>
> 8 x 4TB SAS Hard Drives in RAID 10
>
> ext4
>
> 14.6TB
>
> Content Repo
>
> /data/3/
>
> 2 x 4TB SAS Hard Drives in RAID 1
>
> ext4
>
> 3.7TB
>
> Provence Repo
>
> /ssd
>
> 1 x 4TB PCIe NVMe SSD
>
> ext4
>
> 3.7TB
>
> Content Repo (Used instead of /data/2/ as a test), to see if CPU was
> bottlenecked by Disk operations.
>
>
>
> I will certainly take a look at those. One question with the consume
> record processor is how I would consume from multiple topics and ensure the
> correct Avro schema is used to deserialise the message? We have 1:1 mapping
> of schemas to topics. At the moment the ConsumeKafka processor is reading
> from all topics in one consumer. I’m assuming the attribute kafka.topic
> attribute doesn’t exist at this stage? We use the Avro Schema Registry
> Controller as we don’t have a schema registry in place yet.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Joe Witt [mailto:joe.witt@gmail.com]
> *Sent:* 23 September 2020 17:33
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Nathan
>
>
>
> You have plenty powerful machines to hit super high speeds but what I
> cannot tell is how the disks are setup/capability and layout wise and
> relative to our three repos of importance.  You'll need to share those
> details.
>
>
>
> That said, the design of the flow matters.  The Kafka processors that
> aren't record oriented will perform poorly unless they're acquiring data in
> their natural batches as they arrive from kafka.  In short, use the record
> oriented processors from Kafka.  In it you can even deal with the fact you
> want to go from AVRO to Json and so on.  These processors have a tougher
> learning curve but they perform extremely well and we have powerful
> processors to go along with them for common patterns.
>
>
>
> You absolutely should be able to get to the big numbers you have seen.  It
> requires great flow design (powerful machines are secondary).
>
>
>
> Thanks
>
>
>
> On Wed, Sep 23, 2020 at 9:26 AM <na...@bt.com> wrote:
>
> Hi All,
>
>
>
> We’ve got a NiFi 3 Node Cluster running on 3 x 40 CPU, 256GB RAM (32G Java
> Heap) servers. However, we have only been able to achieve a consumption of
> ~9.48GB Consumption Compressed (38.53GB Uncompressed) over 5 minutes, with
> a production rate of ~16.84GB out of the cluster over  5 mins. This is much
> lower than we were expecting based on what we have read. With this
> throughput we see a CPU load ~32 on all nodes, so we know there isn’t much
> else we can get out of the CPU).
>
>
>
> We have also tried SSDs, Raided and Unraided HDDs for the content repo
> storage, but they haven’t made a difference to the amount we can process.
>
>
>
> The process is as follows:
>
> 1.       Our flow reads from Kafka Compressed (Maximum of 2000 records
> per file). It then converts them from Avro to JSON. (ConsumeKafka_0_10 à
> UpdateAttribute à ConvertRecord)
>
> 2.       Depending on which topic the flow file is consumed from, we then
> send the message to one of 10 potential process groups, each containing
> between 3 and 5 processors within the process groups. (RouteOnAttribute à Relevant
> Processing Group containing JoltTransformJSON and several custom processors
> we have made).
>
> 3.       Finally, we produce the flow file content back to one of several
> Kafka topics, based on the input topic name in Avro format with Snappy
> compression on the Kafka topic.
>
>
>
> Inspecting the queued message counts, it indicates that the Jolt
> Transforms are taking the time to process (Large queues before JOLT
> processors, small or no queues afterwards). But I’m not sure why this is
> any worse than the rest of the processors as the event duration is less
> than a second when inspecting in provenance? We have tuned the number of
> concurrent tasks, duration and schedules to get the performance we have so
> far.
>
>
>
> I’m not sure if there is anything anyone could recommend or suggest to try
> and make improvements? We need to achieve a rate around 5x of what it’s
> currently processing with the same number of nodes. We are running out of
> ideas on how to accomplish this and may have to consider alternatives.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
>

RE: NiFi V1.9.2 Performance

Posted by na...@bt.com.
Hi Matt,

We’ve now switched to the JoltTransformRecord processors. It does seem to be slightly better performing.

We are trying to switch out the ConsumeKafka_0_10 and ConvertRecord processors to the ConsumeKafkaRecord_0_10 processor based on feedback in this chain as well.

With the ConvertRecord processor, we used the kafka.topic attribute as it was available because the ConsumeKafka processor had completed on the flow file. The ConsumeKafkaRecord handles it all in one. When testing, it failed to parse as kafka.topic is not set.

As we are consuming from multiple topics each with a unique Avro schema the only way I can see to do this would be to have a separate processor and AvroReader controller service for each topic?

Kind Regards,

Nathan


From: Matt Burgess [mailto:mattyb149@apache.org]
Sent: 24 September 2020 17:08
To: users@nifi.apache.org
Subject: Re: NiFi V1.9.2 Performance

Nathan,

If you have multiple JSON messages in one flow file, is it in one large array, or a top-level JSON object with an array inside? Also are you trying to transform each message or the whole thing (i.e. do you need to know about more than one message at a time)? If you have a top-level array and are transforming each element in the array, you might get better performance out of JoltTransformRecord rather than JoltTransformJSON, as the latter reads the entire file into memory as a single JSON entity. If you have a top-level object then they will both read the whole thing in.

Regards,
Matt


On Thu, Sep 24, 2020 at 10:25 AM <na...@bt.com>> wrote:
Hi Mark,

From what I can see (based on queues building before the processor and basically to empty after) it is the Jolt processor we have problems with. We’ve have tried to add more concurrency, reduce the run schedule and increasing the duration, but it didn’t seem to resolve the high CPU load (~32 when processing at the rates described in my first email, when no traffic is processing it sits at 0.2).

It could be the completely wrong way of diagnosing this! I’ve struggled to find information (Apart from your great videos) to assist in getting to the bottom of it.

Kind Regards,

Nathan

From: Mark Payne [mailto:markap14@hotmail.com<ma...@hotmail.com>]
Sent: 24 September 2020 15:12
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Hey Nathan,

A quick clarification - is the bottleneck / the slow point in your flow actually consuming from Kafka or Jolt? From your original message it sounds like the bottleneck may actually be the Jolt Transform processor?

If the problem is in the ConsumeKafka processor, one thing you’ll want to look at is in the Settings tab, set the Yield Duration to “0 sec”. That can make a huge difference in performance from Kafka processors.

Thanks
-Mark


On Sep 24, 2020, at 10:07 AM, nathan.english@bt.com<ma...@bt.com> wrote:

Hi Bryan,

Thanks for this. My understanding of the concurrent tasks was incorrect. I thought it was across the whole cluster, not per node.

I did spend some time looking at the code for the demarcator as we had issues getting it batching. I think there may be a slight misunderstanding between my description and how it sounds.

When I say an Empty string, the message demarcator isn’t blank. I have used the checkbox ‘Set Empty String’, which means the processor treats the field as Null (From memory). If I left the field empty (checkbox not selected), it was one Kafka message to one flow file, which was a massive bottleneck.

I also seem to remember from when I looked at the code. The ConsumeKafkaRecord processors defaults the demarcator to null.

Kind Regards,

Nathan

From: Bryan Bende [mailto:bbende@gmail.com]
Sent: 24 September 2020 14:54
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Regarding the batching, I would have to double check the code, but since you said the demarcator is empty string, I think that means it is not batching and putting one message to one flow file. Basically if a demarcator is not set then batch size is ignored.

Regarding the processors/tasks, lets take one topic with 11 partitions as an example, if you make a consumer processor for this topic with 1 concurrent task, then you have 3 instances of this processor since you have a 3 node cluster, so you might end up with something like this...

node 1 - ConsumeKafka
  Task 1 - 4 partitions

node 2 - ConsumeKafka
  Task 1 - 4 partitions

node 3 - ConsumeKafka
  Task 1 - 3 partitions

It may not be exactly like that, but just an example as to how it should be assigned.

To add more parallelism you could then increase concurrent tasks up to maybe 4 and you get something like this...

node 1 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - 1 partition

node 2 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - 1 partition

node 3 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - nothing

If you go higher than 4 concurrent tasks you will just end up creating more consumers than partitions, and there is nothing to assign them.


On Thu, Sep 24, 2020 at 9:30 AM <na...@bt.com>> wrote:
Hi Bryan,

We have configured the processor to read in a maximum batch size of 2k messages, which does seem to have one than more Kafka message in the flow file.

Completely understand on the Load balancing, we tried several iterations of 1 task to one topic partition. However, we still found it to be loaded towards one specific node. I will try splitting it into multiple processors to see if this handles it any better. We have 10 topics with 11 partitions. (one topic with 2 partitions). So I should set all concurrent tasks to 1 with multiple processors (One processor per topic)?

Kind Regards,

Nathan
From: Bryan Bende [mailto:bbende@gmail.com<ma...@gmail.com>]
Sent: 24 September 2020 13:59
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

I'm not sure how many topics you have, but the biggest improvement would probably be to not do a single message per flow file. You want to batch together lots of messages coming out of ConsumeKafka using a demarcator, and then convert/transform them in batches. You may need to have a separate consumer processor for each topic in order to do this correctly.

You generally shouldn't need to use a load balanced connection after ConsumeKafka. If one node is being favored it is likely that the number of partitions in your topic is not lining up with the # of nifi nodes X # of concurrent consumer tasks. In the simplest case, if your topic had one partition, and you have 3 nifi nodes with Consumekafka with 1 concurrent task each, then all your messages will only get consumed on one of the nodes. If you have 3 partitions then it should be roughly equal. In your case you have 3 nodes with concurrent tasks set to 12, so you have potentially 36 consumers, which means if you have anything less than 36 partitions then it is not going to be balanced equally.

On Thu, Sep 24, 2020 at 3:54 AM <na...@bt.com>> wrote:
Hi Joe,

The Raids seem to give us a good IOPS number when we’ve tried testing them. We have seen a 300ms wait time on the Content Repo, hence why we have tried SSDs for the content repo as we assumed that was the bottleneck. The other Raids seemed OK to us.

I will certainly look into the ConsumeKafkaRecord processor today and will come up with a solution to use it. I will feedback on what I find.

In regards to our current flow configuration, we have the following high-level process groups:

•         ConsumeData -  Consumes the data from Kafka, Add some additional attributes, Converts to JSON from Avro

•         TransfromData – This is where the majority of the work happens, it gets routed on an attribute to a set sub-process group based on the type of data it is (this is decided based on the kafka.topic attribute) where the processing happens (Explained in more detail below)

•         ProduceData – Publishes the Record back to Kafka using the PublishKafkaRecord processor, add some attributes to calculate processing duration it took to process and logs this out.

•         Error Handling – A General Catch all. Luckily we don’t see this triggered very often, but it logs a message and writes the file to disk.
<image001.png>

ConsumeData process group:

•         ConsumeKafka, as mentioned in earlier emails, consumes the messages from multiple Kafka Topics with a Message Demarcator of an empty string and the Max poll Records set at 2,000.

•         The Consumed Flows queue uses RoundRobin load balancing as we have found one node is favoured for the consumption from Kafka so wanted to distribute the load.

•         AddAttributes uses the UpdateAttribute to add additional information to the flow file (Data Type, Output Kafka Topic) based on the Kafka.topic attribute on the flow file.

•         Finally, Convert Record converts the content of the flow file from Avro to JSON using the AvroRead and JsonRecordSetWriter Controller Services. In Additional we use the inbuilt AvroSchemaRegistry.
<image002.png>

TransformData process group. Due to the size, it’s not easy to show in a screenshot.

•         Flow files are routed based on attribute to 1 of 10 processing groups (based on the kafka.topic attribute)

•         Each Data Type has its own processing group which is I have gone into detail on one below

•         There are two failure output ports one for the initial routing issues, another for any errors that have happened in the sub-processing groups. There is also the Succesful output port.

<image003.png>


HTTP Data Type Sub-processing group (Inside TransformData):

•         The sub-processing groups all follow a somewhat similar layout but have different custom processors after the initial Jolt Transformation

•         The Jolt Transformation does the majority of the heavy lifting in converting the message into the right format to match the output Avro schema requirements. The Jolt Spec contains a chain of operations including modify-overwrite-beta, shift, and default

•         Each processor after the Jolt transformation does something different to meet a specific requirement not possible with other Nifi processors. Some like the IPAddressProcessor are reused in processing for other data types. We are planning to move into a single post-processing group where all data types are routed, before being published to Kafka.
<image004.png>

Produce Data, produces the messages back to different Kafka topics which have been defined in the output.topic (set in the consume data stage)

•         PublishKafkaRecord converts the message from JSON back to Avro (Different schema to input) using the JSONTreeReader and AvroRecordSetWriter Controller services. We have set it to Guarantee Single Node Delivery, and use Snappy Compression.

•         Set Exit Attributes adds a Final Processing time to the flow file

•         The Calculate duration uses this to work out its overall time processing

•         Then it’s finally logged to file so we can analyse the full processing time.
<image005.png>

Below is the scheduling settings we are using for processors:
Group
Processor
Concurrent
Tasks
Run
Duration
Yield
Duration
Other
Consume Data
ConsumeKafka
12
0
1 mS
Message Demarcator = "Empty string set"
Max Poll Records = 2,000
AddAttributes
10
0
1 S
ConvertRecord
10
0
1 S
TransformData
RouteOnAttribute
10
0
1 S
HTTP Jolt
5
0
200 mS
HTTP Post Processor
2
0
1 S
Produce Data
PublishKafka
10
0
1 S
Max Request Size = 1 MB
Compression Type = snappy
batch.size = 80,000 (Based on some custom producers we have written in the past)
linger.ms<https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Flinger.ms%2F&data=02%7C01%7Cnathan.english%40bt.com%7Ce1a257a5405a4210d35408d860a41ae1%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365605231282190&sdata=DwONev1IuaRISFcSVpRlIzWMLPumsFwBXWZe7V%2BCHVM%3D&reserved=0> = 50,000 (Based on some custom producers we have written in the past)
Set Exit Attributes
10
0
1 S
Calculate Duration
10
0
1 S
Log Success
10
0
1 S

The Queue configs are as follows:

Queue
Back Pressure
Object Threshold
Back Pressure
Size Threshold
Load Balance
After ConsumeKafka
20,000
1 GB
Round Robin
After Route On Attribute
All Other Queues In TransformData
10,000
1 GB
No
All Failure Routes
10,000
1 GB
No
Between ConsumerData And TransformData
Between TransformData And ProduceData
All Other Queues In ConsumeData
Before Route On Attribute
All Queues In ProduceData
20,000
1 GB
No

We also have a Maximum Timer Driven Thread Count of 200 set.

Stateless NiFi would fit this perfectly, but from what I have read, it’s not available in v1.9.2? We are stuck on 1.9.2 as we are using the Zookeeper provided as part of a Cloudera cluster. However, I’m wondering if we would be better suited to the volatile repositories, than writing to disk?

Thanks again for the advice so far Joe, you’ve given us some confidence it’s us doing something wrong and not something with NiFi.

Kind Regards,

Nathan

From: Joe Witt [mailto:joe.witt@gmail.com<ma...@gmail.com>]
Sent: 23 September 2020 19:05
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Nathan

Not sure what read/write rates you'll get in these RAID-10 configs but generally this seems like it should be fine (100s of MB/sec per node range at least).  Whereas now you're seeing about 20MB/sec/node.  This is definitely very low.

If you review http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.12.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/index.html<https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fcomponents%2Forg.apache.nifi%2Fnifi-kafka-2-6-nar%2F1.12.0%2Forg.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6%2Findex.html&data=02%7C01%7Cnathan.english%40bt.com%7Ce1a257a5405a4210d35408d860a41ae1%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365605231282190&sdata=M3rQp0EYnUrRvfElTawfttqKloYH3P6sHEZgVc4Fpgw%3D&reserved=0> then you'll see that we do actually capture attributes such as kafka.topic and so on.  Flowfiles would also be properly grouped by that.  What I'm not positive of is it could handle reading from multiple topics at the same time while also honoring and determining each of their distinct schemas.  Would need to test/verify that scenario to be sure.  If you do have a bunch of topics and they could grow/change then keeping this single processor approach makes sense.  If you can go the route of one ConsumeKafkaRecord per topic then obviously that would work well.

Not seeing your flow though I cannot be certain where the bottleneck(s) exist and provide guidance.  But this is without a doubt a vital skill to achieving maximum performance.

You'd have to show/share a ton more details for folks here to be helpful in walking through the full design.  Or explain the end to end flow.

As an additional food for thought if the flows are indeed 'from kafka -> do stuff -> back to kafka' this is likely a great use case for stateless-nifi.

Thanks

On Wed, Sep 23, 2020 at 10:43 AM <na...@bt.com>> wrote:
Hi Joe,

Thanks for getting back to me so quickly.

Our disk setup is as follows:
Path
Storage Type
Format
Capacity
Content
/
100GB OS SSD
ext4
89.9GB
OS, NiFi install, Logs
/data/1/
2 x 4TB SAS Hard Drives in RAID 1
ext4
3.7TB
Database and Flowfile Repos
/data/2/
8 x 4TB SAS Hard Drives in RAID 10
ext4
14.6TB
Content Repo
/data/3/
2 x 4TB SAS Hard Drives in RAID 1
ext4
3.7TB
Provence Repo
/ssd
1 x 4TB PCIe NVMe SSD
ext4
3.7TB
Content Repo (Used instead of /data/2/ as a test), to see if CPU was bottlenecked by Disk operations.

I will certainly take a look at those. One question with the consume record processor is how I would consume from multiple topics and ensure the correct Avro schema is used to deserialise the message? We have 1:1 mapping of schemas to topics. At the moment the ConsumeKafka processor is reading from all topics in one consumer. I’m assuming the attribute kafka.topic attribute doesn’t exist at this stage? We use the Avro Schema Registry Controller as we don’t have a schema registry in place yet.

Kind Regards,

Nathan

From: Joe Witt [mailto:joe.witt@gmail.com<ma...@gmail.com>]
Sent: 23 September 2020 17:33
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Nathan

You have plenty powerful machines to hit super high speeds but what I cannot tell is how the disks are setup/capability and layout wise and relative to our three repos of importance.  You'll need to share those details.

That said, the design of the flow matters.  The Kafka processors that aren't record oriented will perform poorly unless they're acquiring data in their natural batches as they arrive from kafka.  In short, use the record oriented processors from Kafka.  In it you can even deal with the fact you want to go from AVRO to Json and so on.  These processors have a tougher learning curve but they perform extremely well and we have powerful processors to go along with them for common patterns.

You absolutely should be able to get to the big numbers you have seen.  It requires great flow design (powerful machines are secondary).

Thanks

On Wed, Sep 23, 2020 at 9:26 AM <na...@bt.com>> wrote:
Hi All,

We’ve got a NiFi 3 Node Cluster running on 3 x 40 CPU, 256GB RAM (32G Java Heap) servers. However, we have only been able to achieve a consumption of ~9.48GB Consumption Compressed (38.53GB Uncompressed) over 5 minutes, with a production rate of ~16.84GB out of the cluster over  5 mins. This is much lower than we were expecting based on what we have read. With this throughput we see a CPU load ~32 on all nodes, so we know there isn’t much else we can get out of the CPU).

We have also tried SSDs, Raided and Unraided HDDs for the content repo storage, but they haven’t made a difference to the amount we can process.

The process is as follows:

1.       Our flow reads from Kafka Compressed (Maximum of 2000 records per file). It then converts them from Avro to JSON. (ConsumeKafka_0_10 --> UpdateAttribute --> ConvertRecord)

2.       Depending on which topic the flow file is consumed from, we then send the message to one of 10 potential process groups, each containing between 3 and 5 processors within the process groups. (RouteOnAttribute --> Relevant Processing Group containing JoltTransformJSON and several custom processors we have made).

3.       Finally, we produce the flow file content back to one of several Kafka topics, based on the input topic name in Avro format with Snappy compression on the Kafka topic.

Inspecting the queued message counts, it indicates that the Jolt Transforms are taking the time to process (Large queues before JOLT processors, small or no queues afterwards). But I’m not sure why this is any worse than the rest of the processors as the event duration is less than a second when inspecting in provenance? We have tuned the number of concurrent tasks, duration and schedules to get the performance we have so far.

I’m not sure if there is anything anyone could recommend or suggest to try and make improvements? We need to achieve a rate around 5x of what it’s currently processing with the same number of nodes. We are running out of ideas on how to accomplish this and may have to consider alternatives.

Kind Regards,

Nathan


Re: NiFi V1.9.2 Performance

Posted by Matt Burgess <ma...@gmail.com>.
From that spec it doesn't appear that you are using values from a different
entry in the top-level array (i.e. a different message), so since the first
item in each spec is a "*", those two conditions together usually lead me
to suggest JoltTransformRecord instead (certainly for large top-level
arrays). For JoltTransformRecord you won't need the "*" as the first level
in the specs, so you won't have to refer to "[&(1)]" and "[&(3)]", the spec
is applied to each record so you can proceed knowing the output will be an
array of transformed records (basically the [&1] stuff is handled for you).

Regards,
Matt


On Thu, Sep 24, 2020 at 12:42 PM <na...@bt.com> wrote:

> Hi Matt,
>
>
>
> It’s one large Array for example:
>
>
>
> [ {
>
> “Key1Message1”: “Value1M1”
>
> “Key2Message1”: [ {
>
>                                 “SubKey1Message1”: 1
>
> }
>
> ]
>
> },
>
> {
>
> “Key1Message2”: “Value1M2”
>
> “Key2Message2”: [ {
>
>                                 “SubKey1Message2”: 1
>
> }
>
> ]
>
> },
>
> ]
>
>
>
> So we transform each message inside the array. Our Spec is a little like
> this:
>
> [{
>
>         "operation": "modify-overwrite-beta",
>
>         "spec": {
>
>             "*": {
>
>                 "Field1": "=divideAndRound(0,@(1,Field1),1000000)",
>
>                 "Field2": "=divideAndRound(0,@(1,Field2),1000000)"
>
>             }
>
>         }
>
>     }, {
>
>         "operation": "shift",
>
>         "spec": {
>
>             "*": {
>
>                 "Field1": "[&(1)].OutputField1",
>
>                                                                 "Field2":
> "[&(1)].OutputField2",
>
>                                                                 "Field3":
> "[&(1)].OutputField2",
>
>                 "Sub": {
>
>                     "0": {
>
>                         "SubField1": "[&(3)].OutputField4",
>
>                         "SubField2": "[&(3)].OutputField5"
>
>                     }
>
>                 }
>
>             }
>
>         }
>
>     }]
>
>
>
> If I understood correctly, you would recommend the JoltTransformRecord
> instead?
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
> *From:* Matt Burgess [mailto:mattyb149@apache.org]
> *Sent:* 24 September 2020 17:08
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Nathan,
>
>
>
> If you have multiple JSON messages in one flow file, is it in one large
> array, or a top-level JSON object with an array inside? Also are you trying
> to transform each message or the whole thing (i.e. do you need to know
> about more than one message at a time)? If you have a top-level array and
> are transforming each element in the array, you might get better
> performance out of JoltTransformRecord rather than JoltTransformJSON, as
> the latter reads the entire file into memory as a single JSON entity. If
> you have a top-level object then they will both read the whole thing in.
>
>
>
> Regards,
>
> Matt
>
>
>
>
>
> On Thu, Sep 24, 2020 at 10:25 AM <na...@bt.com> wrote:
>
> Hi Mark,
>
>
>
> From what I can see (based on queues building before the processor and
> basically to empty after) it is the Jolt processor we have problems with.
> We’ve have tried to add more concurrency, reduce the run schedule and
> increasing the duration, but it didn’t seem to resolve the high CPU load
> (~32 when processing at the rates described in my first email, when no
> traffic is processing it sits at 0.2).
>
>
>
> It could be the completely wrong way of diagnosing this! I’ve struggled to
> find information (Apart from your great videos) to assist in getting to the
> bottom of it.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Mark Payne [mailto:markap14@hotmail.com]
> *Sent:* 24 September 2020 15:12
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Hey Nathan,
>
>
>
> A quick clarification - is the bottleneck / the slow point in your flow
> actually consuming from Kafka or Jolt? From your original message it sounds
> like the bottleneck may actually be the Jolt Transform processor?
>
>
>
> If the problem is in the ConsumeKafka processor, one thing you’ll want to
> look at is in the Settings tab, set the Yield Duration to “0 sec”. That can
> make a huge difference in performance from Kafka processors.
>
>
>
> Thanks
>
> -Mark
>
>
>
>
>
> On Sep 24, 2020, at 10:07 AM, nathan.english@bt.com wrote:
>
>
>
> Hi Bryan,
>
>
>
> Thanks for this. My understanding of the concurrent tasks was incorrect. I
> thought it was across the whole cluster, not per node.
>
>
>
> I did spend some time looking at the code for the demarcator as we had
> issues getting it batching. I think there may be a slight misunderstanding
> between my description and how it sounds.
>
>
>
> When I say an Empty string, the message demarcator isn’t blank. I have
> used the checkbox ‘Set Empty String’, which means the processor treats the
> field as Null (From memory). If I left the field empty (checkbox not
> selected), it was one Kafka message to one flow file, which was a massive
> bottleneck.
>
>
>
> I also seem to remember from when I looked at the code. The
> ConsumeKafkaRecord processors defaults the demarcator to null.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Bryan Bende [mailto:bbende@gmail.com <bb...@gmail.com>]
> *Sent:* 24 September 2020 14:54
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Regarding the batching, I would have to double check the code, but since
> you said the demarcator is empty string, I think that means it is not
> batching and putting one message to one flow file. Basically if a
> demarcator is not set then batch size is ignored.
>
>
>
> Regarding the processors/tasks, lets take one topic with 11 partitions as
> an example, if you make a consumer processor for this topic with 1
> concurrent task, then you have 3 instances of this processor since you have
> a 3 node cluster, so you might end up with something like this...
>
>
>
> node 1 - ConsumeKafka
>
>   Task 1 - 4 partitions
>
>
>
> node 2 - ConsumeKafka
>
>   Task 1 - 4 partitions
>
>
>
> node 3 - ConsumeKafka
>
>   Task 1 - 3 partitions
>
>
>
> It may not be exactly like that, but just an example as to how it should
> be assigned.
>
>
>
> To add more parallelism you could then increase concurrent tasks up to
> maybe 4 and you get something like this...
>
>
>
> node 1 - ConsumeKafka
>
>   Task 1 - 1 partition
>
>   Task 2 - 1 partition
>
>   Task 3 - 1 partition
>
>   Task 4 - 1 partition
>
>
>
> node 2 - ConsumeKafka
>
>   Task 1 - 1 partition
>
>   Task 2 - 1 partition
>
>   Task 3 - 1 partition
>
>   Task 4 - 1 partition
>
>
>
> node 3 - ConsumeKafka
>
>   Task 1 - 1 partition
>
>   Task 2 - 1 partition
>
>   Task 3 - 1 partition
>
>   Task 4 - nothing
>
>
>
> If you go higher than 4 concurrent tasks you will just end up creating
> more consumers than partitions, and there is nothing to assign them.
>
>
>
>
>
> On Thu, Sep 24, 2020 at 9:30 AM <na...@bt.com> wrote:
>
> Hi Bryan,
>
>
>
> We have configured the processor to read in a maximum batch size of 2k
> messages, which does seem to have one than more Kafka message in the flow
> file.
>
>
>
> Completely understand on the Load balancing, we tried several iterations
> of 1 task to one topic partition. However, we still found it to be loaded
> towards one specific node. I will try splitting it into multiple processors
> to see if this handles it any better. We have 10 topics with 11 partitions.
> (one topic with 2 partitions). So I should set all concurrent tasks to 1
> with multiple processors (One processor per topic)?
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
> *From:* Bryan Bende [mailto:bbende@gmail.com]
> *Sent:* 24 September 2020 13:59
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> I'm not sure how many topics you have, but the biggest improvement would
> probably be to not do a single message per flow file. You want to batch
> together lots of messages coming out of ConsumeKafka using a demarcator,
> and then convert/transform them in batches. You may need to have a separate
> consumer processor for each topic in order to do this correctly.
>
>
>
> You generally shouldn't need to use a load balanced connection after
> ConsumeKafka. If one node is being favored it is likely that the number of
> partitions in your topic is not lining up with the # of nifi nodes X # of
> concurrent consumer tasks. In the simplest case, if your topic had one
> partition, and you have 3 nifi nodes with Consumekafka with 1 concurrent
> task each, then all your messages will only get consumed on one of the
> nodes. If you have 3 partitions then it should be roughly equal. In your
> case you have 3 nodes with concurrent tasks set to 12, so you have
> potentially 36 consumers, which means if you have anything less than 36
> partitions then it is not going to be balanced equally.
>
>
>
> On Thu, Sep 24, 2020 at 3:54 AM <na...@bt.com> wrote:
>
> Hi Joe,
>
>
>
> The Raids seem to give us a good IOPS number when we’ve tried testing
> them. We have seen a 300ms wait time on the Content Repo, hence why we have
> tried SSDs for the content repo as we assumed that was the bottleneck. The
> other Raids seemed OK to us.
>
>
>
> I will certainly look into the ConsumeKafkaRecord processor today and will
> come up with a solution to use it. I will feedback on what I find.
>
>
>
> In regards to our current flow configuration, we have the following
> high-level process groups:
>
> ·         ConsumeData -  Consumes the data from Kafka, Add some
> additional attributes, Converts to JSON from Avro
>
> ·         TransfromData – This is where the majority of the work happens,
> it gets routed on an attribute to a set sub-process group based on the type
> of data it is (this is decided based on the kafka.topic attribute) where
> the processing happens (Explained in more detail below)
>
> ·         ProduceData – Publishes the Record back to Kafka using the
> PublishKafkaRecord processor, add some attributes to calculate processing
> duration it took to process and logs this out.
>
> ·         Error Handling – A General Catch all. Luckily we don’t see this
> triggered very often, but it logs a message and writes the file to disk.
>
> <image001.png>
>
>
>
> *ConsumeData* process group:
>
> ·         ConsumeKafka, as mentioned in earlier emails, consumes the
> messages from multiple Kafka Topics with a Message Demarcator of an empty
> string and the Max poll Records set at 2,000.
>
> ·         The Consumed Flows queue uses RoundRobin load balancing as we
> have found one node is favoured for the consumption from Kafka so wanted to
> distribute the load.
>
> ·         AddAttributes uses the UpdateAttribute to add additional
> information to the flow file (Data Type, Output Kafka Topic) based on the
> Kafka.topic attribute on the flow file.
>
> ·         Finally, Convert Record converts the content of the flow file
> from Avro to JSON using the AvroRead and JsonRecordSetWriter Controller
> Services. In Additional we use the inbuilt AvroSchemaRegistry.
>
> <image002.png>
>
>
>
> *TransformData* process group. Due to the size, it’s not easy to show in
> a screenshot.
>
> ·         Flow files are routed based on attribute to 1 of 10 processing
> groups (based on the kafka.topic attribute)
>
> ·         Each Data Type has its own processing group which is I have
> gone into detail on one below
>
> ·         There are two failure output ports one for the initial routing
> issues, another for any errors that have happened in the sub-processing
> groups. There is also the Succesful output port.
>
>
>
> <image003.png>
>
>
>
>
>
> *HTTP Data Type Sub-processing group (Inside TransformData):*
>
> ·         The sub-processing groups all follow a somewhat similar layout
> but have different custom processors after the initial Jolt Transformation
>
> ·         The Jolt Transformation does the majority of the heavy lifting
> in converting the message into the right format to match the output Avro
> schema requirements. The Jolt Spec contains a chain of operations including
> modify-overwrite-beta, shift, and default
>
> ·         Each processor after the Jolt transformation does something
> different to meet a specific requirement not possible with other Nifi
> processors. Some like the IPAddressProcessor are reused in processing for
> other data types. We are planning to move into a single post-processing
> group where all data types are routed, before being published to Kafka.
>
> <image004.png>
>
>
>
> *Produce Data*, produces the messages back to different Kafka topics
> which have been defined in the output.topic (set in the consume data stage)
>
> ·         PublishKafkaRecord converts the message from JSON back to Avro
> (Different schema to input) using the JSONTreeReader and
> AvroRecordSetWriter Controller services. We have set it to Guarantee Single
> Node Delivery, and use Snappy Compression.
>
> ·         Set Exit Attributes adds a Final Processing time to the flow
> file
>
> ·         The Calculate duration uses this to work out its overall time
> processing
>
> ·         Then it’s finally logged to file so we can analyse the full
> processing time.
>
> <image005.png>
>
>
>
> Below is the scheduling settings we are using for processors:
>
> *Group*
>
> *Processor*
>
> *Concurrent*
>
> *Tasks*
>
> *Run*
>
> *Duration*
>
> *Yield*
>
> *Duration*
>
> *Other*
>
> Consume Data
>
> ConsumeKafka
>
> 12
>
> 0
>
> 1 mS
>
> Message Demarcator = "Empty string set"
>
> Max Poll Records = 2,000
>
> AddAttributes
>
> 10
>
> 0
>
> 1 S
>
> ConvertRecord
>
> 10
>
> 0
>
> 1 S
>
> TransformData
>
> RouteOnAttribute
>
> 10
>
> 0
>
> 1 S
>
> HTTP Jolt
>
> 5
>
> 0
>
> 200 mS
>
> HTTP Post Processor
>
> 2
>
> 0
>
> 1 S
>
> Produce Data
>
> PublishKafka
>
> 10
>
> 0
>
> 1 S
>
> Max Request Size = 1 MB
>
> Compression Type = snappy
>
> batch.size = 80,000 (Based on some custom producers we have written in the
> past)
>
> linger.ms
> <https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Flinger.ms%2F&data=02%7C01%7Cnathan.english%40bt.com%7Ce1a257a5405a4210d35408d860a41ae1%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365605231282190&sdata=DwONev1IuaRISFcSVpRlIzWMLPumsFwBXWZe7V%2BCHVM%3D&reserved=0> =
> 50,000 (Based on some custom producers we have written in the past)
>
> Set Exit Attributes
>
> 10
>
> 0
>
> 1 S
>
> Calculate Duration
>
> 10
>
> 0
>
> 1 S
>
> Log Success
>
> 10
>
> 0
>
> 1 S
>
>
>
> The Queue configs are as follows:
>
>
>
> *Queue*
>
>
> *Back Pressure Object Threshold*
>
>
> *Back Pressure Size Threshold*
>
> *Load Balance*
>
> After ConsumeKafka
>
> 20,000
>
> 1 GB
>
> Round Robin
>
> After Route On Attribute
>
> All Other Queues In TransformData
>
> 10,000
>
> 1 GB
>
> No
>
> All Failure Routes
>
> 10,000
>
> 1 GB
>
> No
>
> Between ConsumerData And TransformData
>
> Between TransformData And ProduceData
>
> All Other Queues In ConsumeData
>
> Before Route On Attribute
>
> All Queues In ProduceData
>
> 20,000
>
> 1 GB
>
> No
>
>
>
> We also have a Maximum Timer Driven Thread Count of 200 set.
>
>
>
> Stateless NiFi would fit this perfectly, but from what I have read, it’s
> not available in v1.9.2? We are stuck on 1.9.2 as we are using the
> Zookeeper provided as part of a Cloudera cluster. However, I’m wondering if
> we would be better suited to the volatile repositories, than writing to
> disk?
>
>
>
> Thanks again for the advice so far Joe, you’ve given us some confidence
> it’s us doing something wrong and not something with NiFi.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Joe Witt [mailto:joe.witt@gmail.com]
> *Sent:* 23 September 2020 19:05
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Nathan
>
>
>
> Not sure what read/write rates you'll get in these RAID-10 configs but
> generally this seems like it should be fine (100s of MB/sec per node range
> at least).  Whereas now you're seeing about 20MB/sec/node.  This is
> definitely very low.
>
>
>
> If you review
> http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.12.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/index.html
> <https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fcomponents%2Forg.apache.nifi%2Fnifi-kafka-2-6-nar%2F1.12.0%2Forg.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6%2Findex.html&data=02%7C01%7Cnathan.english%40bt.com%7Ce1a257a5405a4210d35408d860a41ae1%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365605231282190&sdata=M3rQp0EYnUrRvfElTawfttqKloYH3P6sHEZgVc4Fpgw%3D&reserved=0>
>  then you'll see that we do actually capture attributes such as
> kafka.topic and so on.  Flowfiles would also be properly grouped by that.
> What I'm not positive of is it could handle reading from multiple topics at
> the same time while also honoring and determining each of their distinct
> schemas.  Would need to test/verify that scenario to be sure.  If you do
> have a bunch of topics and they could grow/change then keeping this single
> processor approach makes sense.  If you can go the route of one
> ConsumeKafkaRecord per topic then obviously that would work well.
>
>
>
> Not seeing your flow though I cannot be certain where the bottleneck(s)
> exist and provide guidance.  But this is without a doubt a vital skill to
> achieving maximum performance.
>
>
>
> You'd have to show/share a ton more details for folks here to be helpful
> in walking through the full design.  Or explain the end to end flow.
>
>
>
> As an additional food for thought if the flows are indeed 'from kafka ->
> do stuff -> back to kafka' this is likely a great use case for
> stateless-nifi.
>
>
>
> Thanks
>
>
>
> On Wed, Sep 23, 2020 at 10:43 AM <na...@bt.com> wrote:
>
> Hi Joe,
>
>
>
> Thanks for getting back to me so quickly.
>
>
>
> Our disk setup is as follows:
>
> Path
>
> Storage Type
>
> Format
>
> Capacity
>
> Content
>
> /
>
> 100GB OS SSD
>
> ext4
>
> 89.9GB
>
> OS, NiFi install, Logs
>
> /data/1/
>
> 2 x 4TB SAS Hard Drives in RAID 1
>
> ext4
>
> 3.7TB
>
> Database and Flowfile Repos
>
> /data/2/
>
> 8 x 4TB SAS Hard Drives in RAID 10
>
> ext4
>
> 14.6TB
>
> Content Repo
>
> /data/3/
>
> 2 x 4TB SAS Hard Drives in RAID 1
>
> ext4
>
> 3.7TB
>
> Provence Repo
>
> /ssd
>
> 1 x 4TB PCIe NVMe SSD
>
> ext4
>
> 3.7TB
>
> Content Repo (Used instead of /data/2/ as a test), to see if CPU was
> bottlenecked by Disk operations.
>
>
>
> I will certainly take a look at those. One question with the consume
> record processor is how I would consume from multiple topics and ensure the
> correct Avro schema is used to deserialise the message? We have 1:1 mapping
> of schemas to topics. At the moment the ConsumeKafka processor is reading
> from all topics in one consumer. I’m assuming the attribute kafka.topic
> attribute doesn’t exist at this stage? We use the Avro Schema Registry
> Controller as we don’t have a schema registry in place yet.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Joe Witt [mailto:joe.witt@gmail.com]
> *Sent:* 23 September 2020 17:33
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Nathan
>
>
>
> You have plenty powerful machines to hit super high speeds but what I
> cannot tell is how the disks are setup/capability and layout wise and
> relative to our three repos of importance.  You'll need to share those
> details.
>
>
>
> That said, the design of the flow matters.  The Kafka processors that
> aren't record oriented will perform poorly unless they're acquiring data in
> their natural batches as they arrive from kafka.  In short, use the record
> oriented processors from Kafka.  In it you can even deal with the fact you
> want to go from AVRO to Json and so on.  These processors have a tougher
> learning curve but they perform extremely well and we have powerful
> processors to go along with them for common patterns.
>
>
>
> You absolutely should be able to get to the big numbers you have seen.  It
> requires great flow design (powerful machines are secondary).
>
>
>
> Thanks
>
>
>
> On Wed, Sep 23, 2020 at 9:26 AM <na...@bt.com> wrote:
>
> Hi All,
>
>
>
> We’ve got a NiFi 3 Node Cluster running on 3 x 40 CPU, 256GB RAM (32G Java
> Heap) servers. However, we have only been able to achieve a consumption of
> ~9.48GB Consumption Compressed (38.53GB Uncompressed) over 5 minutes, with
> a production rate of ~16.84GB out of the cluster over  5 mins. This is much
> lower than we were expecting based on what we have read. With this
> throughput we see a CPU load ~32 on all nodes, so we know there isn’t much
> else we can get out of the CPU).
>
>
>
> We have also tried SSDs, Raided and Unraided HDDs for the content repo
> storage, but they haven’t made a difference to the amount we can process.
>
>
>
> The process is as follows:
>
> 1.       Our flow reads from Kafka Compressed (Maximum of 2000 records
> per file). It then converts them from Avro to JSON. (ConsumeKafka_0_10 à
> UpdateAttribute à ConvertRecord)
>
> 2.       Depending on which topic the flow file is consumed from, we then
> send the message to one of 10 potential process groups, each containing
> between 3 and 5 processors within the process groups. (RouteOnAttribute à Relevant
> Processing Group containing JoltTransformJSON and several custom processors
> we have made).
>
> 3.       Finally, we produce the flow file content back to one of several
> Kafka topics, based on the input topic name in Avro format with Snappy
> compression on the Kafka topic.
>
>
>
> Inspecting the queued message counts, it indicates that the Jolt
> Transforms are taking the time to process (Large queues before JOLT
> processors, small or no queues afterwards). But I’m not sure why this is
> any worse than the rest of the processors as the event duration is less
> than a second when inspecting in provenance? We have tuned the number of
> concurrent tasks, duration and schedules to get the performance we have so
> far.
>
>
>
> I’m not sure if there is anything anyone could recommend or suggest to try
> and make improvements? We need to achieve a rate around 5x of what it’s
> currently processing with the same number of nodes. We are running out of
> ideas on how to accomplish this and may have to consider alternatives.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
>

RE: NiFi V1.9.2 Performance

Posted by na...@bt.com.
Hi Matt,

It’s one large Array for example:

[ {
“Key1Message1”: “Value1M1”
“Key2Message1”: [ {
                                “SubKey1Message1”: 1
}
]
},
{
“Key1Message2”: “Value1M2”
“Key2Message2”: [ {
                                “SubKey1Message2”: 1
}
]
},
]

So we transform each message inside the array. Our Spec is a little like this:
[{
        "operation": "modify-overwrite-beta",
        "spec": {
            "*": {
                "Field1": "=divideAndRound(0,@(1,Field1),1000000)",
                "Field2": "=divideAndRound(0,@(1,Field2),1000000)"
            }
        }
    }, {
        "operation": "shift",
        "spec": {
            "*": {
                "Field1": "[&(1)].OutputField1",
                                                                "Field2": "[&(1)].OutputField2",
                                                                "Field3": "[&(1)].OutputField2",
                "Sub": {
                    "0": {
                        "SubField1": "[&(3)].OutputField4",
                        "SubField2": "[&(3)].OutputField5"
                    }
                }
            }
        }
    }]

If I understood correctly, you would recommend the JoltTransformRecord instead?

Kind Regards,

Nathan
From: Matt Burgess [mailto:mattyb149@apache.org]
Sent: 24 September 2020 17:08
To: users@nifi.apache.org
Subject: Re: NiFi V1.9.2 Performance

Nathan,

If you have multiple JSON messages in one flow file, is it in one large array, or a top-level JSON object with an array inside? Also are you trying to transform each message or the whole thing (i.e. do you need to know about more than one message at a time)? If you have a top-level array and are transforming each element in the array, you might get better performance out of JoltTransformRecord rather than JoltTransformJSON, as the latter reads the entire file into memory as a single JSON entity. If you have a top-level object then they will both read the whole thing in.

Regards,
Matt


On Thu, Sep 24, 2020 at 10:25 AM <na...@bt.com>> wrote:
Hi Mark,

From what I can see (based on queues building before the processor and basically to empty after) it is the Jolt processor we have problems with. We’ve have tried to add more concurrency, reduce the run schedule and increasing the duration, but it didn’t seem to resolve the high CPU load (~32 when processing at the rates described in my first email, when no traffic is processing it sits at 0.2).

It could be the completely wrong way of diagnosing this! I’ve struggled to find information (Apart from your great videos) to assist in getting to the bottom of it.

Kind Regards,

Nathan

From: Mark Payne [mailto:markap14@hotmail.com<ma...@hotmail.com>]
Sent: 24 September 2020 15:12
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Hey Nathan,

A quick clarification - is the bottleneck / the slow point in your flow actually consuming from Kafka or Jolt? From your original message it sounds like the bottleneck may actually be the Jolt Transform processor?

If the problem is in the ConsumeKafka processor, one thing you’ll want to look at is in the Settings tab, set the Yield Duration to “0 sec”. That can make a huge difference in performance from Kafka processors.

Thanks
-Mark


On Sep 24, 2020, at 10:07 AM, nathan.english@bt.com<ma...@bt.com> wrote:

Hi Bryan,

Thanks for this. My understanding of the concurrent tasks was incorrect. I thought it was across the whole cluster, not per node.

I did spend some time looking at the code for the demarcator as we had issues getting it batching. I think there may be a slight misunderstanding between my description and how it sounds.

When I say an Empty string, the message demarcator isn’t blank. I have used the checkbox ‘Set Empty String’, which means the processor treats the field as Null (From memory). If I left the field empty (checkbox not selected), it was one Kafka message to one flow file, which was a massive bottleneck.

I also seem to remember from when I looked at the code. The ConsumeKafkaRecord processors defaults the demarcator to null.

Kind Regards,

Nathan

From: Bryan Bende [mailto:bbende@gmail.com]
Sent: 24 September 2020 14:54
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Regarding the batching, I would have to double check the code, but since you said the demarcator is empty string, I think that means it is not batching and putting one message to one flow file. Basically if a demarcator is not set then batch size is ignored.

Regarding the processors/tasks, lets take one topic with 11 partitions as an example, if you make a consumer processor for this topic with 1 concurrent task, then you have 3 instances of this processor since you have a 3 node cluster, so you might end up with something like this...

node 1 - ConsumeKafka
  Task 1 - 4 partitions

node 2 - ConsumeKafka
  Task 1 - 4 partitions

node 3 - ConsumeKafka
  Task 1 - 3 partitions

It may not be exactly like that, but just an example as to how it should be assigned.

To add more parallelism you could then increase concurrent tasks up to maybe 4 and you get something like this...

node 1 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - 1 partition

node 2 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - 1 partition

node 3 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - nothing

If you go higher than 4 concurrent tasks you will just end up creating more consumers than partitions, and there is nothing to assign them.


On Thu, Sep 24, 2020 at 9:30 AM <na...@bt.com>> wrote:
Hi Bryan,

We have configured the processor to read in a maximum batch size of 2k messages, which does seem to have one than more Kafka message in the flow file.

Completely understand on the Load balancing, we tried several iterations of 1 task to one topic partition. However, we still found it to be loaded towards one specific node. I will try splitting it into multiple processors to see if this handles it any better. We have 10 topics with 11 partitions. (one topic with 2 partitions). So I should set all concurrent tasks to 1 with multiple processors (One processor per topic)?

Kind Regards,

Nathan
From: Bryan Bende [mailto:bbende@gmail.com<ma...@gmail.com>]
Sent: 24 September 2020 13:59
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

I'm not sure how many topics you have, but the biggest improvement would probably be to not do a single message per flow file. You want to batch together lots of messages coming out of ConsumeKafka using a demarcator, and then convert/transform them in batches. You may need to have a separate consumer processor for each topic in order to do this correctly.

You generally shouldn't need to use a load balanced connection after ConsumeKafka. If one node is being favored it is likely that the number of partitions in your topic is not lining up with the # of nifi nodes X # of concurrent consumer tasks. In the simplest case, if your topic had one partition, and you have 3 nifi nodes with Consumekafka with 1 concurrent task each, then all your messages will only get consumed on one of the nodes. If you have 3 partitions then it should be roughly equal. In your case you have 3 nodes with concurrent tasks set to 12, so you have potentially 36 consumers, which means if you have anything less than 36 partitions then it is not going to be balanced equally.

On Thu, Sep 24, 2020 at 3:54 AM <na...@bt.com>> wrote:
Hi Joe,

The Raids seem to give us a good IOPS number when we’ve tried testing them. We have seen a 300ms wait time on the Content Repo, hence why we have tried SSDs for the content repo as we assumed that was the bottleneck. The other Raids seemed OK to us.

I will certainly look into the ConsumeKafkaRecord processor today and will come up with a solution to use it. I will feedback on what I find.

In regards to our current flow configuration, we have the following high-level process groups:

•         ConsumeData -  Consumes the data from Kafka, Add some additional attributes, Converts to JSON from Avro

•         TransfromData – This is where the majority of the work happens, it gets routed on an attribute to a set sub-process group based on the type of data it is (this is decided based on the kafka.topic attribute) where the processing happens (Explained in more detail below)

•         ProduceData – Publishes the Record back to Kafka using the PublishKafkaRecord processor, add some attributes to calculate processing duration it took to process and logs this out.

•         Error Handling – A General Catch all. Luckily we don’t see this triggered very often, but it logs a message and writes the file to disk.
<image001.png>

ConsumeData process group:

•         ConsumeKafka, as mentioned in earlier emails, consumes the messages from multiple Kafka Topics with a Message Demarcator of an empty string and the Max poll Records set at 2,000.

•         The Consumed Flows queue uses RoundRobin load balancing as we have found one node is favoured for the consumption from Kafka so wanted to distribute the load.

•         AddAttributes uses the UpdateAttribute to add additional information to the flow file (Data Type, Output Kafka Topic) based on the Kafka.topic attribute on the flow file.

•         Finally, Convert Record converts the content of the flow file from Avro to JSON using the AvroRead and JsonRecordSetWriter Controller Services. In Additional we use the inbuilt AvroSchemaRegistry.
<image002.png>

TransformData process group. Due to the size, it’s not easy to show in a screenshot.

•         Flow files are routed based on attribute to 1 of 10 processing groups (based on the kafka.topic attribute)

•         Each Data Type has its own processing group which is I have gone into detail on one below

•         There are two failure output ports one for the initial routing issues, another for any errors that have happened in the sub-processing groups. There is also the Succesful output port.

<image003.png>


HTTP Data Type Sub-processing group (Inside TransformData):

•         The sub-processing groups all follow a somewhat similar layout but have different custom processors after the initial Jolt Transformation

•         The Jolt Transformation does the majority of the heavy lifting in converting the message into the right format to match the output Avro schema requirements. The Jolt Spec contains a chain of operations including modify-overwrite-beta, shift, and default

•         Each processor after the Jolt transformation does something different to meet a specific requirement not possible with other Nifi processors. Some like the IPAddressProcessor are reused in processing for other data types. We are planning to move into a single post-processing group where all data types are routed, before being published to Kafka.
<image004.png>

Produce Data, produces the messages back to different Kafka topics which have been defined in the output.topic (set in the consume data stage)

•         PublishKafkaRecord converts the message from JSON back to Avro (Different schema to input) using the JSONTreeReader and AvroRecordSetWriter Controller services. We have set it to Guarantee Single Node Delivery, and use Snappy Compression.

•         Set Exit Attributes adds a Final Processing time to the flow file

•         The Calculate duration uses this to work out its overall time processing

•         Then it’s finally logged to file so we can analyse the full processing time.
<image005.png>

Below is the scheduling settings we are using for processors:
Group
Processor
Concurrent
Tasks
Run
Duration
Yield
Duration
Other
Consume Data
ConsumeKafka
12
0
1 mS
Message Demarcator = "Empty string set"
Max Poll Records = 2,000
AddAttributes
10
0
1 S
ConvertRecord
10
0
1 S
TransformData
RouteOnAttribute
10
0
1 S
HTTP Jolt
5
0
200 mS
HTTP Post Processor
2
0
1 S
Produce Data
PublishKafka
10
0
1 S
Max Request Size = 1 MB
Compression Type = snappy
batch.size = 80,000 (Based on some custom producers we have written in the past)
linger.ms<https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Flinger.ms%2F&data=02%7C01%7Cnathan.english%40bt.com%7Ce1a257a5405a4210d35408d860a41ae1%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365605231282190&sdata=DwONev1IuaRISFcSVpRlIzWMLPumsFwBXWZe7V%2BCHVM%3D&reserved=0> = 50,000 (Based on some custom producers we have written in the past)
Set Exit Attributes
10
0
1 S
Calculate Duration
10
0
1 S
Log Success
10
0
1 S

The Queue configs are as follows:

Queue
Back Pressure
Object Threshold
Back Pressure
Size Threshold
Load Balance
After ConsumeKafka
20,000
1 GB
Round Robin
After Route On Attribute
All Other Queues In TransformData
10,000
1 GB
No
All Failure Routes
10,000
1 GB
No
Between ConsumerData And TransformData
Between TransformData And ProduceData
All Other Queues In ConsumeData
Before Route On Attribute
All Queues In ProduceData
20,000
1 GB
No

We also have a Maximum Timer Driven Thread Count of 200 set.

Stateless NiFi would fit this perfectly, but from what I have read, it’s not available in v1.9.2? We are stuck on 1.9.2 as we are using the Zookeeper provided as part of a Cloudera cluster. However, I’m wondering if we would be better suited to the volatile repositories, than writing to disk?

Thanks again for the advice so far Joe, you’ve given us some confidence it’s us doing something wrong and not something with NiFi.

Kind Regards,

Nathan

From: Joe Witt [mailto:joe.witt@gmail.com<ma...@gmail.com>]
Sent: 23 September 2020 19:05
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Nathan

Not sure what read/write rates you'll get in these RAID-10 configs but generally this seems like it should be fine (100s of MB/sec per node range at least).  Whereas now you're seeing about 20MB/sec/node.  This is definitely very low.

If you review http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.12.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/index.html<https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fcomponents%2Forg.apache.nifi%2Fnifi-kafka-2-6-nar%2F1.12.0%2Forg.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6%2Findex.html&data=02%7C01%7Cnathan.english%40bt.com%7Ce1a257a5405a4210d35408d860a41ae1%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365605231282190&sdata=M3rQp0EYnUrRvfElTawfttqKloYH3P6sHEZgVc4Fpgw%3D&reserved=0> then you'll see that we do actually capture attributes such as kafka.topic and so on.  Flowfiles would also be properly grouped by that.  What I'm not positive of is it could handle reading from multiple topics at the same time while also honoring and determining each of their distinct schemas.  Would need to test/verify that scenario to be sure.  If you do have a bunch of topics and they could grow/change then keeping this single processor approach makes sense.  If you can go the route of one ConsumeKafkaRecord per topic then obviously that would work well.

Not seeing your flow though I cannot be certain where the bottleneck(s) exist and provide guidance.  But this is without a doubt a vital skill to achieving maximum performance.

You'd have to show/share a ton more details for folks here to be helpful in walking through the full design.  Or explain the end to end flow.

As an additional food for thought if the flows are indeed 'from kafka -> do stuff -> back to kafka' this is likely a great use case for stateless-nifi.

Thanks

On Wed, Sep 23, 2020 at 10:43 AM <na...@bt.com>> wrote:
Hi Joe,

Thanks for getting back to me so quickly.

Our disk setup is as follows:
Path
Storage Type
Format
Capacity
Content
/
100GB OS SSD
ext4
89.9GB
OS, NiFi install, Logs
/data/1/
2 x 4TB SAS Hard Drives in RAID 1
ext4
3.7TB
Database and Flowfile Repos
/data/2/
8 x 4TB SAS Hard Drives in RAID 10
ext4
14.6TB
Content Repo
/data/3/
2 x 4TB SAS Hard Drives in RAID 1
ext4
3.7TB
Provence Repo
/ssd
1 x 4TB PCIe NVMe SSD
ext4
3.7TB
Content Repo (Used instead of /data/2/ as a test), to see if CPU was bottlenecked by Disk operations.

I will certainly take a look at those. One question with the consume record processor is how I would consume from multiple topics and ensure the correct Avro schema is used to deserialise the message? We have 1:1 mapping of schemas to topics. At the moment the ConsumeKafka processor is reading from all topics in one consumer. I’m assuming the attribute kafka.topic attribute doesn’t exist at this stage? We use the Avro Schema Registry Controller as we don’t have a schema registry in place yet.

Kind Regards,

Nathan

From: Joe Witt [mailto:joe.witt@gmail.com<ma...@gmail.com>]
Sent: 23 September 2020 17:33
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Nathan

You have plenty powerful machines to hit super high speeds but what I cannot tell is how the disks are setup/capability and layout wise and relative to our three repos of importance.  You'll need to share those details.

That said, the design of the flow matters.  The Kafka processors that aren't record oriented will perform poorly unless they're acquiring data in their natural batches as they arrive from kafka.  In short, use the record oriented processors from Kafka.  In it you can even deal with the fact you want to go from AVRO to Json and so on.  These processors have a tougher learning curve but they perform extremely well and we have powerful processors to go along with them for common patterns.

You absolutely should be able to get to the big numbers you have seen.  It requires great flow design (powerful machines are secondary).

Thanks

On Wed, Sep 23, 2020 at 9:26 AM <na...@bt.com>> wrote:
Hi All,

We’ve got a NiFi 3 Node Cluster running on 3 x 40 CPU, 256GB RAM (32G Java Heap) servers. However, we have only been able to achieve a consumption of ~9.48GB Consumption Compressed (38.53GB Uncompressed) over 5 minutes, with a production rate of ~16.84GB out of the cluster over  5 mins. This is much lower than we were expecting based on what we have read. With this throughput we see a CPU load ~32 on all nodes, so we know there isn’t much else we can get out of the CPU).

We have also tried SSDs, Raided and Unraided HDDs for the content repo storage, but they haven’t made a difference to the amount we can process.

The process is as follows:

1.       Our flow reads from Kafka Compressed (Maximum of 2000 records per file). It then converts them from Avro to JSON. (ConsumeKafka_0_10 --> UpdateAttribute --> ConvertRecord)

2.       Depending on which topic the flow file is consumed from, we then send the message to one of 10 potential process groups, each containing between 3 and 5 processors within the process groups. (RouteOnAttribute --> Relevant Processing Group containing JoltTransformJSON and several custom processors we have made).

3.       Finally, we produce the flow file content back to one of several Kafka topics, based on the input topic name in Avro format with Snappy compression on the Kafka topic.

Inspecting the queued message counts, it indicates that the Jolt Transforms are taking the time to process (Large queues before JOLT processors, small or no queues afterwards). But I’m not sure why this is any worse than the rest of the processors as the event duration is less than a second when inspecting in provenance? We have tuned the number of concurrent tasks, duration and schedules to get the performance we have so far.

I’m not sure if there is anything anyone could recommend or suggest to try and make improvements? We need to achieve a rate around 5x of what it’s currently processing with the same number of nodes. We are running out of ideas on how to accomplish this and may have to consider alternatives.

Kind Regards,

Nathan


Re: NiFi V1.9.2 Performance

Posted by Matt Burgess <ma...@apache.org>.
Nathan,

If you have multiple JSON messages in one flow file, is it in one large
array, or a top-level JSON object with an array inside? Also are you trying
to transform each message or the whole thing (i.e. do you need to know
about more than one message at a time)? If you have a top-level array and
are transforming each element in the array, you might get better
performance out of JoltTransformRecord rather than JoltTransformJSON, as
the latter reads the entire file into memory as a single JSON entity. If
you have a top-level object then they will both read the whole thing in.

Regards,
Matt


On Thu, Sep 24, 2020 at 10:25 AM <na...@bt.com> wrote:

> Hi Mark,
>
>
>
> From what I can see (based on queues building before the processor and
> basically to empty after) it is the Jolt processor we have problems with.
> We’ve have tried to add more concurrency, reduce the run schedule and
> increasing the duration, but it didn’t seem to resolve the high CPU load
> (~32 when processing at the rates described in my first email, when no
> traffic is processing it sits at 0.2).
>
>
>
> It could be the completely wrong way of diagnosing this! I’ve struggled to
> find information (Apart from your great videos) to assist in getting to the
> bottom of it.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Mark Payne [mailto:markap14@hotmail.com]
> *Sent:* 24 September 2020 15:12
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Hey Nathan,
>
>
>
> A quick clarification - is the bottleneck / the slow point in your flow
> actually consuming from Kafka or Jolt? From your original message it sounds
> like the bottleneck may actually be the Jolt Transform processor?
>
>
>
> If the problem is in the ConsumeKafka processor, one thing you’ll want to
> look at is in the Settings tab, set the Yield Duration to “0 sec”. That can
> make a huge difference in performance from Kafka processors.
>
>
>
> Thanks
>
> -Mark
>
>
>
>
>
> On Sep 24, 2020, at 10:07 AM, nathan.english@bt.com wrote:
>
>
>
> Hi Bryan,
>
>
>
> Thanks for this. My understanding of the concurrent tasks was incorrect. I
> thought it was across the whole cluster, not per node.
>
>
>
> I did spend some time looking at the code for the demarcator as we had
> issues getting it batching. I think there may be a slight misunderstanding
> between my description and how it sounds.
>
>
>
> When I say an Empty string, the message demarcator isn’t blank. I have
> used the checkbox ‘Set Empty String’, which means the processor treats the
> field as Null (From memory). If I left the field empty (checkbox not
> selected), it was one Kafka message to one flow file, which was a massive
> bottleneck.
>
>
>
> I also seem to remember from when I looked at the code. The
> ConsumeKafkaRecord processors defaults the demarcator to null.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Bryan Bende [mailto:bbende@gmail.com <bb...@gmail.com>]
> *Sent:* 24 September 2020 14:54
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Regarding the batching, I would have to double check the code, but since
> you said the demarcator is empty string, I think that means it is not
> batching and putting one message to one flow file. Basically if a
> demarcator is not set then batch size is ignored.
>
>
>
> Regarding the processors/tasks, lets take one topic with 11 partitions as
> an example, if you make a consumer processor for this topic with 1
> concurrent task, then you have 3 instances of this processor since you have
> a 3 node cluster, so you might end up with something like this...
>
>
>
> node 1 - ConsumeKafka
>
>   Task 1 - 4 partitions
>
>
>
> node 2 - ConsumeKafka
>
>   Task 1 - 4 partitions
>
>
>
> node 3 - ConsumeKafka
>
>   Task 1 - 3 partitions
>
>
>
> It may not be exactly like that, but just an example as to how it should
> be assigned.
>
>
>
> To add more parallelism you could then increase concurrent tasks up to
> maybe 4 and you get something like this...
>
>
>
> node 1 - ConsumeKafka
>
>   Task 1 - 1 partition
>
>   Task 2 - 1 partition
>
>   Task 3 - 1 partition
>
>   Task 4 - 1 partition
>
>
>
> node 2 - ConsumeKafka
>
>   Task 1 - 1 partition
>
>   Task 2 - 1 partition
>
>   Task 3 - 1 partition
>
>   Task 4 - 1 partition
>
>
>
> node 3 - ConsumeKafka
>
>   Task 1 - 1 partition
>
>   Task 2 - 1 partition
>
>   Task 3 - 1 partition
>
>   Task 4 - nothing
>
>
>
> If you go higher than 4 concurrent tasks you will just end up creating
> more consumers than partitions, and there is nothing to assign them.
>
>
>
>
>
> On Thu, Sep 24, 2020 at 9:30 AM <na...@bt.com> wrote:
>
> Hi Bryan,
>
>
>
> We have configured the processor to read in a maximum batch size of 2k
> messages, which does seem to have one than more Kafka message in the flow
> file.
>
>
>
> Completely understand on the Load balancing, we tried several iterations
> of 1 task to one topic partition. However, we still found it to be loaded
> towards one specific node. I will try splitting it into multiple processors
> to see if this handles it any better. We have 10 topics with 11 partitions.
> (one topic with 2 partitions). So I should set all concurrent tasks to 1
> with multiple processors (One processor per topic)?
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
> *From:* Bryan Bende [mailto:bbende@gmail.com]
> *Sent:* 24 September 2020 13:59
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> I'm not sure how many topics you have, but the biggest improvement would
> probably be to not do a single message per flow file. You want to batch
> together lots of messages coming out of ConsumeKafka using a demarcator,
> and then convert/transform them in batches. You may need to have a separate
> consumer processor for each topic in order to do this correctly.
>
>
>
> You generally shouldn't need to use a load balanced connection after
> ConsumeKafka. If one node is being favored it is likely that the number of
> partitions in your topic is not lining up with the # of nifi nodes X # of
> concurrent consumer tasks. In the simplest case, if your topic had one
> partition, and you have 3 nifi nodes with Consumekafka with 1 concurrent
> task each, then all your messages will only get consumed on one of the
> nodes. If you have 3 partitions then it should be roughly equal. In your
> case you have 3 nodes with concurrent tasks set to 12, so you have
> potentially 36 consumers, which means if you have anything less than 36
> partitions then it is not going to be balanced equally.
>
>
>
> On Thu, Sep 24, 2020 at 3:54 AM <na...@bt.com> wrote:
>
> Hi Joe,
>
>
>
> The Raids seem to give us a good IOPS number when we’ve tried testing
> them. We have seen a 300ms wait time on the Content Repo, hence why we have
> tried SSDs for the content repo as we assumed that was the bottleneck. The
> other Raids seemed OK to us.
>
>
>
> I will certainly look into the ConsumeKafkaRecord processor today and will
> come up with a solution to use it. I will feedback on what I find.
>
>
>
> In regards to our current flow configuration, we have the following
> high-level process groups:
>
> ·         ConsumeData -  Consumes the data from Kafka, Add some
> additional attributes, Converts to JSON from Avro
>
> ·         TransfromData – This is where the majority of the work happens,
> it gets routed on an attribute to a set sub-process group based on the type
> of data it is (this is decided based on the kafka.topic attribute) where
> the processing happens (Explained in more detail below)
>
> ·         ProduceData – Publishes the Record back to Kafka using the
> PublishKafkaRecord processor, add some attributes to calculate processing
> duration it took to process and logs this out.
>
> ·         Error Handling – A General Catch all. Luckily we don’t see this
> triggered very often, but it logs a message and writes the file to disk.
>
> <image001.png>
>
>
>
> *ConsumeData* process group:
>
> ·         ConsumeKafka, as mentioned in earlier emails, consumes the
> messages from multiple Kafka Topics with a Message Demarcator of an empty
> string and the Max poll Records set at 2,000.
>
> ·         The Consumed Flows queue uses RoundRobin load balancing as we
> have found one node is favoured for the consumption from Kafka so wanted to
> distribute the load.
>
> ·         AddAttributes uses the UpdateAttribute to add additional
> information to the flow file (Data Type, Output Kafka Topic) based on the
> Kafka.topic attribute on the flow file.
>
> ·         Finally, Convert Record converts the content of the flow file
> from Avro to JSON using the AvroRead and JsonRecordSetWriter Controller
> Services. In Additional we use the inbuilt AvroSchemaRegistry.
>
> <image002.png>
>
>
>
> *TransformData* process group. Due to the size, it’s not easy to show in
> a screenshot.
>
> ·         Flow files are routed based on attribute to 1 of 10 processing
> groups (based on the kafka.topic attribute)
>
> ·         Each Data Type has its own processing group which is I have
> gone into detail on one below
>
> ·         There are two failure output ports one for the initial routing
> issues, another for any errors that have happened in the sub-processing
> groups. There is also the Succesful output port.
>
>
>
> <image003.png>
>
>
>
>
>
> *HTTP Data Type Sub-processing group (Inside TransformData):*
>
> ·         The sub-processing groups all follow a somewhat similar layout
> but have different custom processors after the initial Jolt Transformation
>
> ·         The Jolt Transformation does the majority of the heavy lifting
> in converting the message into the right format to match the output Avro
> schema requirements. The Jolt Spec contains a chain of operations including
> modify-overwrite-beta, shift, and default
>
> ·         Each processor after the Jolt transformation does something
> different to meet a specific requirement not possible with other Nifi
> processors. Some like the IPAddressProcessor are reused in processing for
> other data types. We are planning to move into a single post-processing
> group where all data types are routed, before being published to Kafka.
>
> <image004.png>
>
>
>
> *Produce Data*, produces the messages back to different Kafka topics
> which have been defined in the output.topic (set in the consume data stage)
>
> ·         PublishKafkaRecord converts the message from JSON back to Avro
> (Different schema to input) using the JSONTreeReader and
> AvroRecordSetWriter Controller services. We have set it to Guarantee Single
> Node Delivery, and use Snappy Compression.
>
> ·         Set Exit Attributes adds a Final Processing time to the flow
> file
>
> ·         The Calculate duration uses this to work out its overall time
> processing
>
> ·         Then it’s finally logged to file so we can analyse the full
> processing time.
>
> <image005.png>
>
>
>
> Below is the scheduling settings we are using for processors:
>
> *Group*
>
> *Processor*
>
> *Concurrent*
>
> *Tasks*
>
> *Run*
>
> *Duration*
>
> *Yield*
>
> *Duration*
>
> *Other*
>
> Consume Data
>
> ConsumeKafka
>
> 12
>
> 0
>
> 1 mS
>
> Message Demarcator = "Empty string set"
>
> Max Poll Records = 2,000
>
> AddAttributes
>
> 10
>
> 0
>
> 1 S
>
> ConvertRecord
>
> 10
>
> 0
>
> 1 S
>
> TransformData
>
> RouteOnAttribute
>
> 10
>
> 0
>
> 1 S
>
> HTTP Jolt
>
> 5
>
> 0
>
> 200 mS
>
> HTTP Post Processor
>
> 2
>
> 0
>
> 1 S
>
> Produce Data
>
> PublishKafka
>
> 10
>
> 0
>
> 1 S
>
> Max Request Size = 1 MB
>
> Compression Type = snappy
>
> batch.size = 80,000 (Based on some custom producers we have written in the
> past)
>
> linger.ms
> <https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Flinger.ms%2F&data=02%7C01%7Cnathan.english%40bt.com%7C62ee67d577054aab9d6908d86093ef31%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365535788927672&sdata=DLM3f8%2Fva5c94uWvel99ab9bYIcMykCYbugHKztX1b0%3D&reserved=0> =
> 50,000 (Based on some custom producers we have written in the past)
>
> Set Exit Attributes
>
> 10
>
> 0
>
> 1 S
>
> Calculate Duration
>
> 10
>
> 0
>
> 1 S
>
> Log Success
>
> 10
>
> 0
>
> 1 S
>
>
>
> The Queue configs are as follows:
>
>
>
> *Queue*
>
>
> *Back Pressure Object Threshold*
>
>
> *Back Pressure Size Threshold*
>
> *Load Balance*
>
> After ConsumeKafka
>
> 20,000
>
> 1 GB
>
> Round Robin
>
> After Route On Attribute
>
> All Other Queues In TransformData
>
> 10,000
>
> 1 GB
>
> No
>
> All Failure Routes
>
> 10,000
>
> 1 GB
>
> No
>
> Between ConsumerData And TransformData
>
> Between TransformData And ProduceData
>
> All Other Queues In ConsumeData
>
> Before Route On Attribute
>
> All Queues In ProduceData
>
> 20,000
>
> 1 GB
>
> No
>
>
>
> We also have a Maximum Timer Driven Thread Count of 200 set.
>
>
>
> Stateless NiFi would fit this perfectly, but from what I have read, it’s
> not available in v1.9.2? We are stuck on 1.9.2 as we are using the
> Zookeeper provided as part of a Cloudera cluster. However, I’m wondering if
> we would be better suited to the volatile repositories, than writing to
> disk?
>
>
>
> Thanks again for the advice so far Joe, you’ve given us some confidence
> it’s us doing something wrong and not something with NiFi.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Joe Witt [mailto:joe.witt@gmail.com]
> *Sent:* 23 September 2020 19:05
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Nathan
>
>
>
> Not sure what read/write rates you'll get in these RAID-10 configs but
> generally this seems like it should be fine (100s of MB/sec per node range
> at least).  Whereas now you're seeing about 20MB/sec/node.  This is
> definitely very low.
>
>
>
> If you review
> http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.12.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/index.html
> <https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fcomponents%2Forg.apache.nifi%2Fnifi-kafka-2-6-nar%2F1.12.0%2Forg.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6%2Findex.html&data=02%7C01%7Cnathan.english%40bt.com%7C62ee67d577054aab9d6908d86093ef31%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365535788937674&sdata=vPlc9V2XQbsWrU%2B5aWGC5f0BkPWvCsuGZjjdtQcs12s%3D&reserved=0>
>  then you'll see that we do actually capture attributes such as
> kafka.topic and so on.  Flowfiles would also be properly grouped by that.
> What I'm not positive of is it could handle reading from multiple topics at
> the same time while also honoring and determining each of their distinct
> schemas.  Would need to test/verify that scenario to be sure.  If you do
> have a bunch of topics and they could grow/change then keeping this single
> processor approach makes sense.  If you can go the route of one
> ConsumeKafkaRecord per topic then obviously that would work well.
>
>
>
> Not seeing your flow though I cannot be certain where the bottleneck(s)
> exist and provide guidance.  But this is without a doubt a vital skill to
> achieving maximum performance.
>
>
>
> You'd have to show/share a ton more details for folks here to be helpful
> in walking through the full design.  Or explain the end to end flow.
>
>
>
> As an additional food for thought if the flows are indeed 'from kafka ->
> do stuff -> back to kafka' this is likely a great use case for
> stateless-nifi.
>
>
>
> Thanks
>
>
>
> On Wed, Sep 23, 2020 at 10:43 AM <na...@bt.com> wrote:
>
> Hi Joe,
>
>
>
> Thanks for getting back to me so quickly.
>
>
>
> Our disk setup is as follows:
>
> Path
>
> Storage Type
>
> Format
>
> Capacity
>
> Content
>
> /
>
> 100GB OS SSD
>
> ext4
>
> 89.9GB
>
> OS, NiFi install, Logs
>
> /data/1/
>
> 2 x 4TB SAS Hard Drives in RAID 1
>
> ext4
>
> 3.7TB
>
> Database and Flowfile Repos
>
> /data/2/
>
> 8 x 4TB SAS Hard Drives in RAID 10
>
> ext4
>
> 14.6TB
>
> Content Repo
>
> /data/3/
>
> 2 x 4TB SAS Hard Drives in RAID 1
>
> ext4
>
> 3.7TB
>
> Provence Repo
>
> /ssd
>
> 1 x 4TB PCIe NVMe SSD
>
> ext4
>
> 3.7TB
>
> Content Repo (Used instead of /data/2/ as a test), to see if CPU was
> bottlenecked by Disk operations.
>
>
>
> I will certainly take a look at those. One question with the consume
> record processor is how I would consume from multiple topics and ensure the
> correct Avro schema is used to deserialise the message? We have 1:1 mapping
> of schemas to topics. At the moment the ConsumeKafka processor is reading
> from all topics in one consumer. I’m assuming the attribute kafka.topic
> attribute doesn’t exist at this stage? We use the Avro Schema Registry
> Controller as we don’t have a schema registry in place yet.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Joe Witt [mailto:joe.witt@gmail.com]
> *Sent:* 23 September 2020 17:33
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Nathan
>
>
>
> You have plenty powerful machines to hit super high speeds but what I
> cannot tell is how the disks are setup/capability and layout wise and
> relative to our three repos of importance.  You'll need to share those
> details.
>
>
>
> That said, the design of the flow matters.  The Kafka processors that
> aren't record oriented will perform poorly unless they're acquiring data in
> their natural batches as they arrive from kafka.  In short, use the record
> oriented processors from Kafka.  In it you can even deal with the fact you
> want to go from AVRO to Json and so on.  These processors have a tougher
> learning curve but they perform extremely well and we have powerful
> processors to go along with them for common patterns.
>
>
>
> You absolutely should be able to get to the big numbers you have seen.  It
> requires great flow design (powerful machines are secondary).
>
>
>
> Thanks
>
>
>
> On Wed, Sep 23, 2020 at 9:26 AM <na...@bt.com> wrote:
>
> Hi All,
>
>
>
> We’ve got a NiFi 3 Node Cluster running on 3 x 40 CPU, 256GB RAM (32G Java
> Heap) servers. However, we have only been able to achieve a consumption of
> ~9.48GB Consumption Compressed (38.53GB Uncompressed) over 5 minutes, with
> a production rate of ~16.84GB out of the cluster over  5 mins. This is much
> lower than we were expecting based on what we have read. With this
> throughput we see a CPU load ~32 on all nodes, so we know there isn’t much
> else we can get out of the CPU).
>
>
>
> We have also tried SSDs, Raided and Unraided HDDs for the content repo
> storage, but they haven’t made a difference to the amount we can process.
>
>
>
> The process is as follows:
>
> 1.       Our flow reads from Kafka Compressed (Maximum of 2000 records
> per file). It then converts them from Avro to JSON. (ConsumeKafka_0_10 à
> UpdateAttribute à ConvertRecord)
>
> 2.       Depending on which topic the flow file is consumed from, we then
> send the message to one of 10 potential process groups, each containing
> between 3 and 5 processors within the process groups. (RouteOnAttribute à Relevant
> Processing Group containing JoltTransformJSON and several custom processors
> we have made).
>
> 3.       Finally, we produce the flow file content back to one of several
> Kafka topics, based on the input topic name in Avro format with Snappy
> compression on the Kafka topic.
>
>
>
> Inspecting the queued message counts, it indicates that the Jolt
> Transforms are taking the time to process (Large queues before JOLT
> processors, small or no queues afterwards). But I’m not sure why this is
> any worse than the rest of the processors as the event duration is less
> than a second when inspecting in provenance? We have tuned the number of
> concurrent tasks, duration and schedules to get the performance we have so
> far.
>
>
>
> I’m not sure if there is anything anyone could recommend or suggest to try
> and make improvements? We need to achieve a rate around 5x of what it’s
> currently processing with the same number of nodes. We are running out of
> ideas on how to accomplish this and may have to consider alternatives.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>

RE: NiFi V1.9.2 Performance

Posted by na...@bt.com.
Hi Mark,

From what I can see (based on queues building before the processor and basically to empty after) it is the Jolt processor we have problems with. We’ve have tried to add more concurrency, reduce the run schedule and increasing the duration, but it didn’t seem to resolve the high CPU load (~32 when processing at the rates described in my first email, when no traffic is processing it sits at 0.2).

It could be the completely wrong way of diagnosing this! I’ve struggled to find information (Apart from your great videos) to assist in getting to the bottom of it.

Kind Regards,

Nathan

From: Mark Payne [mailto:markap14@hotmail.com]
Sent: 24 September 2020 15:12
To: users@nifi.apache.org
Subject: Re: NiFi V1.9.2 Performance

Hey Nathan,

A quick clarification - is the bottleneck / the slow point in your flow actually consuming from Kafka or Jolt? From your original message it sounds like the bottleneck may actually be the Jolt Transform processor?

If the problem is in the ConsumeKafka processor, one thing you’ll want to look at is in the Settings tab, set the Yield Duration to “0 sec”. That can make a huge difference in performance from Kafka processors.

Thanks
-Mark



On Sep 24, 2020, at 10:07 AM, nathan.english@bt.com<ma...@bt.com> wrote:

Hi Bryan,

Thanks for this. My understanding of the concurrent tasks was incorrect. I thought it was across the whole cluster, not per node.

I did spend some time looking at the code for the demarcator as we had issues getting it batching. I think there may be a slight misunderstanding between my description and how it sounds.

When I say an Empty string, the message demarcator isn’t blank. I have used the checkbox ‘Set Empty String’, which means the processor treats the field as Null (From memory). If I left the field empty (checkbox not selected), it was one Kafka message to one flow file, which was a massive bottleneck.

I also seem to remember from when I looked at the code. The ConsumeKafkaRecord processors defaults the demarcator to null.

Kind Regards,

Nathan

From: Bryan Bende [mailto:bbende@gmail.com]
Sent: 24 September 2020 14:54
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Regarding the batching, I would have to double check the code, but since you said the demarcator is empty string, I think that means it is not batching and putting one message to one flow file. Basically if a demarcator is not set then batch size is ignored.

Regarding the processors/tasks, lets take one topic with 11 partitions as an example, if you make a consumer processor for this topic with 1 concurrent task, then you have 3 instances of this processor since you have a 3 node cluster, so you might end up with something like this...

node 1 - ConsumeKafka
  Task 1 - 4 partitions

node 2 - ConsumeKafka
  Task 1 - 4 partitions

node 3 - ConsumeKafka
  Task 1 - 3 partitions

It may not be exactly like that, but just an example as to how it should be assigned.

To add more parallelism you could then increase concurrent tasks up to maybe 4 and you get something like this...

node 1 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - 1 partition

node 2 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - 1 partition

node 3 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - nothing

If you go higher than 4 concurrent tasks you will just end up creating more consumers than partitions, and there is nothing to assign them.


On Thu, Sep 24, 2020 at 9:30 AM <na...@bt.com>> wrote:
Hi Bryan,

We have configured the processor to read in a maximum batch size of 2k messages, which does seem to have one than more Kafka message in the flow file.

Completely understand on the Load balancing, we tried several iterations of 1 task to one topic partition. However, we still found it to be loaded towards one specific node. I will try splitting it into multiple processors to see if this handles it any better. We have 10 topics with 11 partitions. (one topic with 2 partitions). So I should set all concurrent tasks to 1 with multiple processors (One processor per topic)?

Kind Regards,

Nathan
From: Bryan Bende [mailto:bbende@gmail.com<ma...@gmail.com>]
Sent: 24 September 2020 13:59
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

I'm not sure how many topics you have, but the biggest improvement would probably be to not do a single message per flow file. You want to batch together lots of messages coming out of ConsumeKafka using a demarcator, and then convert/transform them in batches. You may need to have a separate consumer processor for each topic in order to do this correctly.

You generally shouldn't need to use a load balanced connection after ConsumeKafka. If one node is being favored it is likely that the number of partitions in your topic is not lining up with the # of nifi nodes X # of concurrent consumer tasks. In the simplest case, if your topic had one partition, and you have 3 nifi nodes with Consumekafka with 1 concurrent task each, then all your messages will only get consumed on one of the nodes. If you have 3 partitions then it should be roughly equal. In your case you have 3 nodes with concurrent tasks set to 12, so you have potentially 36 consumers, which means if you have anything less than 36 partitions then it is not going to be balanced equally.

On Thu, Sep 24, 2020 at 3:54 AM <na...@bt.com>> wrote:
Hi Joe,

The Raids seem to give us a good IOPS number when we’ve tried testing them. We have seen a 300ms wait time on the Content Repo, hence why we have tried SSDs for the content repo as we assumed that was the bottleneck. The other Raids seemed OK to us.

I will certainly look into the ConsumeKafkaRecord processor today and will come up with a solution to use it. I will feedback on what I find.

In regards to our current flow configuration, we have the following high-level process groups:

•         ConsumeData -  Consumes the data from Kafka, Add some additional attributes, Converts to JSON from Avro

•         TransfromData – This is where the majority of the work happens, it gets routed on an attribute to a set sub-process group based on the type of data it is (this is decided based on the kafka.topic attribute) where the processing happens (Explained in more detail below)

•         ProduceData – Publishes the Record back to Kafka using the PublishKafkaRecord processor, add some attributes to calculate processing duration it took to process and logs this out.

•         Error Handling – A General Catch all. Luckily we don’t see this triggered very often, but it logs a message and writes the file to disk.
<image001.png>

ConsumeData process group:

•         ConsumeKafka, as mentioned in earlier emails, consumes the messages from multiple Kafka Topics with a Message Demarcator of an empty string and the Max poll Records set at 2,000.

•         The Consumed Flows queue uses RoundRobin load balancing as we have found one node is favoured for the consumption from Kafka so wanted to distribute the load.

•         AddAttributes uses the UpdateAttribute to add additional information to the flow file (Data Type, Output Kafka Topic) based on the Kafka.topic attribute on the flow file.

•         Finally, Convert Record converts the content of the flow file from Avro to JSON using the AvroRead and JsonRecordSetWriter Controller Services. In Additional we use the inbuilt AvroSchemaRegistry.
<image002.png>

TransformData process group. Due to the size, it’s not easy to show in a screenshot.

•         Flow files are routed based on attribute to 1 of 10 processing groups (based on the kafka.topic attribute)

•         Each Data Type has its own processing group which is I have gone into detail on one below

•         There are two failure output ports one for the initial routing issues, another for any errors that have happened in the sub-processing groups. There is also the Succesful output port.

<image003.png>


HTTP Data Type Sub-processing group (Inside TransformData):

•         The sub-processing groups all follow a somewhat similar layout but have different custom processors after the initial Jolt Transformation

•         The Jolt Transformation does the majority of the heavy lifting in converting the message into the right format to match the output Avro schema requirements. The Jolt Spec contains a chain of operations including modify-overwrite-beta, shift, and default

•         Each processor after the Jolt transformation does something different to meet a specific requirement not possible with other Nifi processors. Some like the IPAddressProcessor are reused in processing for other data types. We are planning to move into a single post-processing group where all data types are routed, before being published to Kafka.
<image004.png>

Produce Data, produces the messages back to different Kafka topics which have been defined in the output.topic (set in the consume data stage)

•         PublishKafkaRecord converts the message from JSON back to Avro (Different schema to input) using the JSONTreeReader and AvroRecordSetWriter Controller services. We have set it to Guarantee Single Node Delivery, and use Snappy Compression.

•         Set Exit Attributes adds a Final Processing time to the flow file

•         The Calculate duration uses this to work out its overall time processing

•         Then it’s finally logged to file so we can analyse the full processing time.
<image005.png>

Below is the scheduling settings we are using for processors:
Group
Processor
Concurrent
Tasks
Run
Duration
Yield
Duration
Other
Consume Data
ConsumeKafka
12
0
1 mS
Message Demarcator = "Empty string set"
Max Poll Records = 2,000
AddAttributes
10
0
1 S
ConvertRecord
10
0
1 S
TransformData
RouteOnAttribute
10
0
1 S
HTTP Jolt
5
0
200 mS
HTTP Post Processor
2
0
1 S
Produce Data
PublishKafka
10
0
1 S
Max Request Size = 1 MB
Compression Type = snappy
batch.size = 80,000 (Based on some custom producers we have written in the past)
linger.ms<https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Flinger.ms%2F&data=02%7C01%7Cnathan.english%40bt.com%7C62ee67d577054aab9d6908d86093ef31%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365535788927672&sdata=DLM3f8%2Fva5c94uWvel99ab9bYIcMykCYbugHKztX1b0%3D&reserved=0> = 50,000 (Based on some custom producers we have written in the past)
Set Exit Attributes
10
0
1 S
Calculate Duration
10
0
1 S
Log Success
10
0
1 S

The Queue configs are as follows:

Queue
Back Pressure
Object Threshold
Back Pressure
Size Threshold
Load Balance
After ConsumeKafka
20,000
1 GB
Round Robin
After Route On Attribute
All Other Queues In TransformData
10,000
1 GB
No
All Failure Routes
10,000
1 GB
No
Between ConsumerData And TransformData
Between TransformData And ProduceData
All Other Queues In ConsumeData
Before Route On Attribute
All Queues In ProduceData
20,000
1 GB
No

We also have a Maximum Timer Driven Thread Count of 200 set.

Stateless NiFi would fit this perfectly, but from what I have read, it’s not available in v1.9.2? We are stuck on 1.9.2 as we are using the Zookeeper provided as part of a Cloudera cluster. However, I’m wondering if we would be better suited to the volatile repositories, than writing to disk?

Thanks again for the advice so far Joe, you’ve given us some confidence it’s us doing something wrong and not something with NiFi.

Kind Regards,

Nathan

From: Joe Witt [mailto:joe.witt@gmail.com<ma...@gmail.com>]
Sent: 23 September 2020 19:05
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Nathan

Not sure what read/write rates you'll get in these RAID-10 configs but generally this seems like it should be fine (100s of MB/sec per node range at least).  Whereas now you're seeing about 20MB/sec/node.  This is definitely very low.

If you review http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.12.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/index.html<https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fcomponents%2Forg.apache.nifi%2Fnifi-kafka-2-6-nar%2F1.12.0%2Forg.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6%2Findex.html&data=02%7C01%7Cnathan.english%40bt.com%7C62ee67d577054aab9d6908d86093ef31%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365535788937674&sdata=vPlc9V2XQbsWrU%2B5aWGC5f0BkPWvCsuGZjjdtQcs12s%3D&reserved=0> then you'll see that we do actually capture attributes such as kafka.topic and so on.  Flowfiles would also be properly grouped by that.  What I'm not positive of is it could handle reading from multiple topics at the same time while also honoring and determining each of their distinct schemas.  Would need to test/verify that scenario to be sure.  If you do have a bunch of topics and they could grow/change then keeping this single processor approach makes sense.  If you can go the route of one ConsumeKafkaRecord per topic then obviously that would work well.

Not seeing your flow though I cannot be certain where the bottleneck(s) exist and provide guidance.  But this is without a doubt a vital skill to achieving maximum performance.

You'd have to show/share a ton more details for folks here to be helpful in walking through the full design.  Or explain the end to end flow.

As an additional food for thought if the flows are indeed 'from kafka -> do stuff -> back to kafka' this is likely a great use case for stateless-nifi.

Thanks

On Wed, Sep 23, 2020 at 10:43 AM <na...@bt.com>> wrote:
Hi Joe,

Thanks for getting back to me so quickly.

Our disk setup is as follows:
Path
Storage Type
Format
Capacity
Content
/
100GB OS SSD
ext4
89.9GB
OS, NiFi install, Logs
/data/1/
2 x 4TB SAS Hard Drives in RAID 1
ext4
3.7TB
Database and Flowfile Repos
/data/2/
8 x 4TB SAS Hard Drives in RAID 10
ext4
14.6TB
Content Repo
/data/3/
2 x 4TB SAS Hard Drives in RAID 1
ext4
3.7TB
Provence Repo
/ssd
1 x 4TB PCIe NVMe SSD
ext4
3.7TB
Content Repo (Used instead of /data/2/ as a test), to see if CPU was bottlenecked by Disk operations.

I will certainly take a look at those. One question with the consume record processor is how I would consume from multiple topics and ensure the correct Avro schema is used to deserialise the message? We have 1:1 mapping of schemas to topics. At the moment the ConsumeKafka processor is reading from all topics in one consumer. I’m assuming the attribute kafka.topic attribute doesn’t exist at this stage? We use the Avro Schema Registry Controller as we don’t have a schema registry in place yet.

Kind Regards,

Nathan

From: Joe Witt [mailto:joe.witt@gmail.com<ma...@gmail.com>]
Sent: 23 September 2020 17:33
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Nathan

You have plenty powerful machines to hit super high speeds but what I cannot tell is how the disks are setup/capability and layout wise and relative to our three repos of importance.  You'll need to share those details.

That said, the design of the flow matters.  The Kafka processors that aren't record oriented will perform poorly unless they're acquiring data in their natural batches as they arrive from kafka.  In short, use the record oriented processors from Kafka.  In it you can even deal with the fact you want to go from AVRO to Json and so on.  These processors have a tougher learning curve but they perform extremely well and we have powerful processors to go along with them for common patterns.

You absolutely should be able to get to the big numbers you have seen.  It requires great flow design (powerful machines are secondary).

Thanks

On Wed, Sep 23, 2020 at 9:26 AM <na...@bt.com>> wrote:
Hi All,

We’ve got a NiFi 3 Node Cluster running on 3 x 40 CPU, 256GB RAM (32G Java Heap) servers. However, we have only been able to achieve a consumption of ~9.48GB Consumption Compressed (38.53GB Uncompressed) over 5 minutes, with a production rate of ~16.84GB out of the cluster over  5 mins. This is much lower than we were expecting based on what we have read. With this throughput we see a CPU load ~32 on all nodes, so we know there isn’t much else we can get out of the CPU).

We have also tried SSDs, Raided and Unraided HDDs for the content repo storage, but they haven’t made a difference to the amount we can process.

The process is as follows:

1.       Our flow reads from Kafka Compressed (Maximum of 2000 records per file). It then converts them from Avro to JSON. (ConsumeKafka_0_10 --> UpdateAttribute --> ConvertRecord)

2.       Depending on which topic the flow file is consumed from, we then send the message to one of 10 potential process groups, each containing between 3 and 5 processors within the process groups. (RouteOnAttribute --> Relevant Processing Group containing JoltTransformJSON and several custom processors we have made).

3.       Finally, we produce the flow file content back to one of several Kafka topics, based on the input topic name in Avro format with Snappy compression on the Kafka topic.

Inspecting the queued message counts, it indicates that the Jolt Transforms are taking the time to process (Large queues before JOLT processors, small or no queues afterwards). But I’m not sure why this is any worse than the rest of the processors as the event duration is less than a second when inspecting in provenance? We have tuned the number of concurrent tasks, duration and schedules to get the performance we have so far.

I’m not sure if there is anything anyone could recommend or suggest to try and make improvements? We need to achieve a rate around 5x of what it’s currently processing with the same number of nodes. We are running out of ideas on how to accomplish this and may have to consider alternatives.

Kind Regards,

Nathan


Re: NiFi V1.9.2 Performance

Posted by Mark Payne <ma...@hotmail.com>.
Hey Nathan,

A quick clarification - is the bottleneck / the slow point in your flow actually consuming from Kafka or Jolt? From your original message it sounds like the bottleneck may actually be the Jolt Transform processor?

If the problem is in the ConsumeKafka processor, one thing you’ll want to look at is in the Settings tab, set the Yield Duration to “0 sec”. That can make a huge difference in performance from Kafka processors.

Thanks
-Mark


On Sep 24, 2020, at 10:07 AM, nathan.english@bt.com<ma...@bt.com> wrote:

Hi Bryan,

Thanks for this. My understanding of the concurrent tasks was incorrect. I thought it was across the whole cluster, not per node.

I did spend some time looking at the code for the demarcator as we had issues getting it batching. I think there may be a slight misunderstanding between my description and how it sounds.

When I say an Empty string, the message demarcator isn’t blank. I have used the checkbox ‘Set Empty String’, which means the processor treats the field as Null (From memory). If I left the field empty (checkbox not selected), it was one Kafka message to one flow file, which was a massive bottleneck.

I also seem to remember from when I looked at the code. The ConsumeKafkaRecord processors defaults the demarcator to null.

Kind Regards,

Nathan

From: Bryan Bende [mailto:bbende@gmail.com]
Sent: 24 September 2020 14:54
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Regarding the batching, I would have to double check the code, but since you said the demarcator is empty string, I think that means it is not batching and putting one message to one flow file. Basically if a demarcator is not set then batch size is ignored.

Regarding the processors/tasks, lets take one topic with 11 partitions as an example, if you make a consumer processor for this topic with 1 concurrent task, then you have 3 instances of this processor since you have a 3 node cluster, so you might end up with something like this...

node 1 - ConsumeKafka
  Task 1 - 4 partitions

node 2 - ConsumeKafka
  Task 1 - 4 partitions

node 3 - ConsumeKafka
  Task 1 - 3 partitions

It may not be exactly like that, but just an example as to how it should be assigned.

To add more parallelism you could then increase concurrent tasks up to maybe 4 and you get something like this...

node 1 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - 1 partition

node 2 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - 1 partition

node 3 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - nothing

If you go higher than 4 concurrent tasks you will just end up creating more consumers than partitions, and there is nothing to assign them.


On Thu, Sep 24, 2020 at 9:30 AM <na...@bt.com>> wrote:
Hi Bryan,

We have configured the processor to read in a maximum batch size of 2k messages, which does seem to have one than more Kafka message in the flow file.

Completely understand on the Load balancing, we tried several iterations of 1 task to one topic partition. However, we still found it to be loaded towards one specific node. I will try splitting it into multiple processors to see if this handles it any better. We have 10 topics with 11 partitions. (one topic with 2 partitions). So I should set all concurrent tasks to 1 with multiple processors (One processor per topic)?

Kind Regards,

Nathan
From: Bryan Bende [mailto:bbende@gmail.com<ma...@gmail.com>]
Sent: 24 September 2020 13:59
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

I'm not sure how many topics you have, but the biggest improvement would probably be to not do a single message per flow file. You want to batch together lots of messages coming out of ConsumeKafka using a demarcator, and then convert/transform them in batches. You may need to have a separate consumer processor for each topic in order to do this correctly.

You generally shouldn't need to use a load balanced connection after ConsumeKafka. If one node is being favored it is likely that the number of partitions in your topic is not lining up with the # of nifi nodes X # of concurrent consumer tasks. In the simplest case, if your topic had one partition, and you have 3 nifi nodes with Consumekafka with 1 concurrent task each, then all your messages will only get consumed on one of the nodes. If you have 3 partitions then it should be roughly equal. In your case you have 3 nodes with concurrent tasks set to 12, so you have potentially 36 consumers, which means if you have anything less than 36 partitions then it is not going to be balanced equally.

On Thu, Sep 24, 2020 at 3:54 AM <na...@bt.com>> wrote:
Hi Joe,

The Raids seem to give us a good IOPS number when we’ve tried testing them. We have seen a 300ms wait time on the Content Repo, hence why we have tried SSDs for the content repo as we assumed that was the bottleneck. The other Raids seemed OK to us.

I will certainly look into the ConsumeKafkaRecord processor today and will come up with a solution to use it. I will feedback on what I find.

In regards to our current flow configuration, we have the following high-level process groups:

•         ConsumeData -  Consumes the data from Kafka, Add some additional attributes, Converts to JSON from Avro

•         TransfromData – This is where the majority of the work happens, it gets routed on an attribute to a set sub-process group based on the type of data it is (this is decided based on the kafka.topic attribute) where the processing happens (Explained in more detail below)

•         ProduceData – Publishes the Record back to Kafka using the PublishKafkaRecord processor, add some attributes to calculate processing duration it took to process and logs this out.

•         Error Handling – A General Catch all. Luckily we don’t see this triggered very often, but it logs a message and writes the file to disk.

<image001.png>

ConsumeData process group:

•         ConsumeKafka, as mentioned in earlier emails, consumes the messages from multiple Kafka Topics with a Message Demarcator of an empty string and the Max poll Records set at 2,000.

•         The Consumed Flows queue uses RoundRobin load balancing as we have found one node is favoured for the consumption from Kafka so wanted to distribute the load.

•         AddAttributes uses the UpdateAttribute to add additional information to the flow file (Data Type, Output Kafka Topic) based on the Kafka.topic attribute on the flow file.

•         Finally, Convert Record converts the content of the flow file from Avro to JSON using the AvroRead and JsonRecordSetWriter Controller Services. In Additional we use the inbuilt AvroSchemaRegistry.

<image002.png>

TransformData process group. Due to the size, it’s not easy to show in a screenshot.

•         Flow files are routed based on attribute to 1 of 10 processing groups (based on the kafka.topic attribute)

•         Each Data Type has its own processing group which is I have gone into detail on one below

•         There are two failure output ports one for the initial routing issues, another for any errors that have happened in the sub-processing groups. There is also the Succesful output port.


<image003.png>


HTTP Data Type Sub-processing group (Inside TransformData):

•         The sub-processing groups all follow a somewhat similar layout but have different custom processors after the initial Jolt Transformation

•         The Jolt Transformation does the majority of the heavy lifting in converting the message into the right format to match the output Avro schema requirements. The Jolt Spec contains a chain of operations including modify-overwrite-beta, shift, and default

•         Each processor after the Jolt transformation does something different to meet a specific requirement not possible with other Nifi processors. Some like the IPAddressProcessor are reused in processing for other data types. We are planning to move into a single post-processing group where all data types are routed, before being published to Kafka.

<image004.png>

Produce Data, produces the messages back to different Kafka topics which have been defined in the output.topic (set in the consume data stage)

•         PublishKafkaRecord converts the message from JSON back to Avro (Different schema to input) using the JSONTreeReader and AvroRecordSetWriter Controller services. We have set it to Guarantee Single Node Delivery, and use Snappy Compression.

•         Set Exit Attributes adds a Final Processing time to the flow file

•         The Calculate duration uses this to work out its overall time processing

•         Then it’s finally logged to file so we can analyse the full processing time.

<image005.png>

Below is the scheduling settings we are using for processors:
Group
Processor
Concurrent
Tasks
Run
Duration
Yield
Duration
Other
Consume Data
ConsumeKafka
12
0
1 mS
Message Demarcator = "Empty string set"
Max Poll Records = 2,000
AddAttributes
10
0
1 S
ConvertRecord
10
0
1 S
TransformData
RouteOnAttribute
10
0
1 S
HTTP Jolt
5
0
200 mS
HTTP Post Processor
2
0
1 S
Produce Data
PublishKafka
10
0
1 S
Max Request Size = 1 MB
Compression Type = snappy
batch.size = 80,000 (Based on some custom producers we have written in the past)
linger.ms<https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Flinger.ms%2F&data=02%7C01%7Cnathan.english%40bt.com%7C7f28164b2851457ee32208d8609178f6%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365525219732931&sdata=0qmNuFnPkrSX2XZNAb7UPeWt%2Fw%2BQXkXO0H5M6rDMil8%3D&reserved=0> = 50,000 (Based on some custom producers we have written in the past)
Set Exit Attributes
10
0
1 S
Calculate Duration
10
0
1 S
Log Success
10
0
1 S

The Queue configs are as follows:

Queue
Back Pressure
Object Threshold
Back Pressure
Size Threshold
Load Balance
After ConsumeKafka
20,000
1 GB
Round Robin
After Route On Attribute
All Other Queues In TransformData
10,000
1 GB
No
All Failure Routes
10,000
1 GB
No
Between ConsumerData And TransformData
Between TransformData And ProduceData
All Other Queues In ConsumeData
Before Route On Attribute
All Queues In ProduceData
20,000
1 GB
No

We also have a Maximum Timer Driven Thread Count of 200 set.

Stateless NiFi would fit this perfectly, but from what I have read, it’s not available in v1.9.2? We are stuck on 1.9.2 as we are using the Zookeeper provided as part of a Cloudera cluster. However, I’m wondering if we would be better suited to the volatile repositories, than writing to disk?

Thanks again for the advice so far Joe, you’ve given us some confidence it’s us doing something wrong and not something with NiFi.

Kind Regards,

Nathan

From: Joe Witt [mailto:joe.witt@gmail.com<ma...@gmail.com>]
Sent: 23 September 2020 19:05
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Nathan

Not sure what read/write rates you'll get in these RAID-10 configs but generally this seems like it should be fine (100s of MB/sec per node range at least).  Whereas now you're seeing about 20MB/sec/node.  This is definitely very low.

If you review http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.12.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/index.html<https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fcomponents%2Forg.apache.nifi%2Fnifi-kafka-2-6-nar%2F1.12.0%2Forg.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6%2Findex.html&data=02%7C01%7Cnathan.english%40bt.com%7C7f28164b2851457ee32208d8609178f6%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365525219742926&sdata=vQID2ICeVLl%2Bevo6K%2FNlRy8AJTOgHQmMAkfrwxVta5Q%3D&reserved=0> then you'll see that we do actually capture attributes such as kafka.topic and so on.  Flowfiles would also be properly grouped by that.  What I'm not positive of is it could handle reading from multiple topics at the same time while also honoring and determining each of their distinct schemas.  Would need to test/verify that scenario to be sure.  If you do have a bunch of topics and they could grow/change then keeping this single processor approach makes sense.  If you can go the route of one ConsumeKafkaRecord per topic then obviously that would work well.

Not seeing your flow though I cannot be certain where the bottleneck(s) exist and provide guidance.  But this is without a doubt a vital skill to achieving maximum performance.

You'd have to show/share a ton more details for folks here to be helpful in walking through the full design.  Or explain the end to end flow.

As an additional food for thought if the flows are indeed 'from kafka -> do stuff -> back to kafka' this is likely a great use case for stateless-nifi.

Thanks

On Wed, Sep 23, 2020 at 10:43 AM <na...@bt.com>> wrote:
Hi Joe,

Thanks for getting back to me so quickly.

Our disk setup is as follows:
Path
Storage Type
Format
Capacity
Content
/
100GB OS SSD
ext4
89.9GB
OS, NiFi install, Logs
/data/1/
2 x 4TB SAS Hard Drives in RAID 1
ext4
3.7TB
Database and Flowfile Repos
/data/2/
8 x 4TB SAS Hard Drives in RAID 10
ext4
14.6TB
Content Repo
/data/3/
2 x 4TB SAS Hard Drives in RAID 1
ext4
3.7TB
Provence Repo
/ssd
1 x 4TB PCIe NVMe SSD
ext4
3.7TB
Content Repo (Used instead of /data/2/ as a test), to see if CPU was bottlenecked by Disk operations.

I will certainly take a look at those. One question with the consume record processor is how I would consume from multiple topics and ensure the correct Avro schema is used to deserialise the message? We have 1:1 mapping of schemas to topics. At the moment the ConsumeKafka processor is reading from all topics in one consumer. I’m assuming the attribute kafka.topic attribute doesn’t exist at this stage? We use the Avro Schema Registry Controller as we don’t have a schema registry in place yet.

Kind Regards,

Nathan

From: Joe Witt [mailto:joe.witt@gmail.com<ma...@gmail.com>]
Sent: 23 September 2020 17:33
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Nathan

You have plenty powerful machines to hit super high speeds but what I cannot tell is how the disks are setup/capability and layout wise and relative to our three repos of importance.  You'll need to share those details.

That said, the design of the flow matters.  The Kafka processors that aren't record oriented will perform poorly unless they're acquiring data in their natural batches as they arrive from kafka.  In short, use the record oriented processors from Kafka.  In it you can even deal with the fact you want to go from AVRO to Json and so on.  These processors have a tougher learning curve but they perform extremely well and we have powerful processors to go along with them for common patterns.

You absolutely should be able to get to the big numbers you have seen.  It requires great flow design (powerful machines are secondary).

Thanks

On Wed, Sep 23, 2020 at 9:26 AM <na...@bt.com>> wrote:
Hi All,

We’ve got a NiFi 3 Node Cluster running on 3 x 40 CPU, 256GB RAM (32G Java Heap) servers. However, we have only been able to achieve a consumption of ~9.48GB Consumption Compressed (38.53GB Uncompressed) over 5 minutes, with a production rate of ~16.84GB out of the cluster over  5 mins. This is much lower than we were expecting based on what we have read. With this throughput we see a CPU load ~32 on all nodes, so we know there isn’t much else we can get out of the CPU).

We have also tried SSDs, Raided and Unraided HDDs for the content repo storage, but they haven’t made a difference to the amount we can process.

The process is as follows:

1.       Our flow reads from Kafka Compressed (Maximum of 2000 records per file). It then converts them from Avro to JSON. (ConsumeKafka_0_10 --> UpdateAttribute --> ConvertRecord)

2.       Depending on which topic the flow file is consumed from, we then send the message to one of 10 potential process groups, each containing between 3 and 5 processors within the process groups. (RouteOnAttribute --> Relevant Processing Group containing JoltTransformJSON and several custom processors we have made).

3.       Finally, we produce the flow file content back to one of several Kafka topics, based on the input topic name in Avro format with Snappy compression on the Kafka topic.


Inspecting the queued message counts, it indicates that the Jolt Transforms are taking the time to process (Large queues before JOLT processors, small or no queues afterwards). But I’m not sure why this is any worse than the rest of the processors as the event duration is less than a second when inspecting in provenance? We have tuned the number of concurrent tasks, duration and schedules to get the performance we have so far.

I’m not sure if there is anything anyone could recommend or suggest to try and make improvements? We need to achieve a rate around 5x of what it’s currently processing with the same number of nodes. We are running out of ideas on how to accomplish this and may have to consider alternatives.

Kind Regards,

Nathan


RE: NiFi V1.9.2 Performance

Posted by na...@bt.com.
Hi Bryan,

Thanks for this. My understanding of the concurrent tasks was incorrect. I thought it was across the whole cluster, not per node.

I did spend some time looking at the code for the demarcator as we had issues getting it batching. I think there may be a slight misunderstanding between my description and how it sounds.

When I say an Empty string, the message demarcator isn’t blank. I have used the checkbox ‘Set Empty String’, which means the processor treats the field as Null (From memory). If I left the field empty (checkbox not selected), it was one Kafka message to one flow file, which was a massive bottleneck.

I also seem to remember from when I looked at the code. The ConsumeKafkaRecord processors defaults the demarcator to null.

Kind Regards,

Nathan

From: Bryan Bende [mailto:bbende@gmail.com]
Sent: 24 September 2020 14:54
To: users@nifi.apache.org
Subject: Re: NiFi V1.9.2 Performance

Regarding the batching, I would have to double check the code, but since you said the demarcator is empty string, I think that means it is not batching and putting one message to one flow file. Basically if a demarcator is not set then batch size is ignored.

Regarding the processors/tasks, lets take one topic with 11 partitions as an example, if you make a consumer processor for this topic with 1 concurrent task, then you have 3 instances of this processor since you have a 3 node cluster, so you might end up with something like this...

node 1 - ConsumeKafka
  Task 1 - 4 partitions

node 2 - ConsumeKafka
  Task 1 - 4 partitions

node 3 - ConsumeKafka
  Task 1 - 3 partitions

It may not be exactly like that, but just an example as to how it should be assigned.

To add more parallelism you could then increase concurrent tasks up to maybe 4 and you get something like this...

node 1 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - 1 partition

node 2 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - 1 partition

node 3 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - nothing

If you go higher than 4 concurrent tasks you will just end up creating more consumers than partitions, and there is nothing to assign them.


On Thu, Sep 24, 2020 at 9:30 AM <na...@bt.com>> wrote:
Hi Bryan,

We have configured the processor to read in a maximum batch size of 2k messages, which does seem to have one than more Kafka message in the flow file.

Completely understand on the Load balancing, we tried several iterations of 1 task to one topic partition. However, we still found it to be loaded towards one specific node. I will try splitting it into multiple processors to see if this handles it any better. We have 10 topics with 11 partitions. (one topic with 2 partitions). So I should set all concurrent tasks to 1 with multiple processors (One processor per topic)?

Kind Regards,

Nathan
From: Bryan Bende [mailto:bbende@gmail.com<ma...@gmail.com>]
Sent: 24 September 2020 13:59
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

I'm not sure how many topics you have, but the biggest improvement would probably be to not do a single message per flow file. You want to batch together lots of messages coming out of ConsumeKafka using a demarcator, and then convert/transform them in batches. You may need to have a separate consumer processor for each topic in order to do this correctly.

You generally shouldn't need to use a load balanced connection after ConsumeKafka. If one node is being favored it is likely that the number of partitions in your topic is not lining up with the # of nifi nodes X # of concurrent consumer tasks. In the simplest case, if your topic had one partition, and you have 3 nifi nodes with Consumekafka with 1 concurrent task each, then all your messages will only get consumed on one of the nodes. If you have 3 partitions then it should be roughly equal. In your case you have 3 nodes with concurrent tasks set to 12, so you have potentially 36 consumers, which means if you have anything less than 36 partitions then it is not going to be balanced equally.

On Thu, Sep 24, 2020 at 3:54 AM <na...@bt.com>> wrote:
Hi Joe,

The Raids seem to give us a good IOPS number when we’ve tried testing them. We have seen a 300ms wait time on the Content Repo, hence why we have tried SSDs for the content repo as we assumed that was the bottleneck. The other Raids seemed OK to us.

I will certainly look into the ConsumeKafkaRecord processor today and will come up with a solution to use it. I will feedback on what I find.

In regards to our current flow configuration, we have the following high-level process groups:

•         ConsumeData -  Consumes the data from Kafka, Add some additional attributes, Converts to JSON from Avro

•         TransfromData – This is where the majority of the work happens, it gets routed on an attribute to a set sub-process group based on the type of data it is (this is decided based on the kafka.topic attribute) where the processing happens (Explained in more detail below)

•         ProduceData – Publishes the Record back to Kafka using the PublishKafkaRecord processor, add some attributes to calculate processing duration it took to process and logs this out.

•         Error Handling – A General Catch all. Luckily we don’t see this triggered very often, but it logs a message and writes the file to disk.
[cid:image001.png@01D69283.5E5E7CB0]

ConsumeData process group:

•         ConsumeKafka, as mentioned in earlier emails, consumes the messages from multiple Kafka Topics with a Message Demarcator of an empty string and the Max poll Records set at 2,000.

•         The Consumed Flows queue uses RoundRobin load balancing as we have found one node is favoured for the consumption from Kafka so wanted to distribute the load.

•         AddAttributes uses the UpdateAttribute to add additional information to the flow file (Data Type, Output Kafka Topic) based on the Kafka.topic attribute on the flow file.

•         Finally, Convert Record converts the content of the flow file from Avro to JSON using the AvroRead and JsonRecordSetWriter Controller Services. In Additional we use the inbuilt AvroSchemaRegistry.
[cid:image002.png@01D69283.5E5E7CB0]

TransformData process group. Due to the size, it’s not easy to show in a screenshot.

•         Flow files are routed based on attribute to 1 of 10 processing groups (based on the kafka.topic attribute)

•         Each Data Type has its own processing group which is I have gone into detail on one below

•         There are two failure output ports one for the initial routing issues, another for any errors that have happened in the sub-processing groups. There is also the Succesful output port.

[cid:image003.png@01D69283.5E5E7CB0]


HTTP Data Type Sub-processing group (Inside TransformData):

•         The sub-processing groups all follow a somewhat similar layout but have different custom processors after the initial Jolt Transformation

•         The Jolt Transformation does the majority of the heavy lifting in converting the message into the right format to match the output Avro schema requirements. The Jolt Spec contains a chain of operations including modify-overwrite-beta, shift, and default

•         Each processor after the Jolt transformation does something different to meet a specific requirement not possible with other Nifi processors. Some like the IPAddressProcessor are reused in processing for other data types. We are planning to move into a single post-processing group where all data types are routed, before being published to Kafka.
[cid:image004.png@01D69283.5E5E7CB0]

Produce Data, produces the messages back to different Kafka topics which have been defined in the output.topic (set in the consume data stage)

•         PublishKafkaRecord converts the message from JSON back to Avro (Different schema to input) using the JSONTreeReader and AvroRecordSetWriter Controller services. We have set it to Guarantee Single Node Delivery, and use Snappy Compression.

•         Set Exit Attributes adds a Final Processing time to the flow file

•         The Calculate duration uses this to work out its overall time processing

•         Then it’s finally logged to file so we can analyse the full processing time.
[cid:image005.png@01D69283.5E5E7CB0]

Below is the scheduling settings we are using for processors:
Group
Processor
Concurrent
Tasks
Run
Duration
Yield
Duration
Other
Consume Data
ConsumeKafka
12
0
1 mS
Message Demarcator = "Empty string set"
Max Poll Records = 2,000
AddAttributes
10
0
1 S
ConvertRecord
10
0
1 S
TransformData
RouteOnAttribute
10
0
1 S
HTTP Jolt
5
0
200 mS
HTTP Post Processor
2
0
1 S
Produce Data
PublishKafka
10
0
1 S
Max Request Size = 1 MB
Compression Type = snappy
batch.size = 80,000 (Based on some custom producers we have written in the past)
linger.ms<https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Flinger.ms%2F&data=02%7C01%7Cnathan.english%40bt.com%7C7f28164b2851457ee32208d8609178f6%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365525219732931&sdata=0qmNuFnPkrSX2XZNAb7UPeWt%2Fw%2BQXkXO0H5M6rDMil8%3D&reserved=0> = 50,000 (Based on some custom producers we have written in the past)
Set Exit Attributes
10
0
1 S
Calculate Duration
10
0
1 S
Log Success
10
0
1 S

The Queue configs are as follows:

Queue
Back Pressure
Object Threshold
Back Pressure
Size Threshold
Load Balance
After ConsumeKafka
20,000
1 GB
Round Robin
After Route On Attribute
All Other Queues In TransformData
10,000
1 GB
No
All Failure Routes
10,000
1 GB
No
Between ConsumerData And TransformData
Between TransformData And ProduceData
All Other Queues In ConsumeData
Before Route On Attribute
All Queues In ProduceData
20,000
1 GB
No

We also have a Maximum Timer Driven Thread Count of 200 set.

Stateless NiFi would fit this perfectly, but from what I have read, it’s not available in v1.9.2? We are stuck on 1.9.2 as we are using the Zookeeper provided as part of a Cloudera cluster. However, I’m wondering if we would be better suited to the volatile repositories, than writing to disk?

Thanks again for the advice so far Joe, you’ve given us some confidence it’s us doing something wrong and not something with NiFi.

Kind Regards,

Nathan

From: Joe Witt [mailto:joe.witt@gmail.com<ma...@gmail.com>]
Sent: 23 September 2020 19:05
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Nathan

Not sure what read/write rates you'll get in these RAID-10 configs but generally this seems like it should be fine (100s of MB/sec per node range at least).  Whereas now you're seeing about 20MB/sec/node.  This is definitely very low.

If you review http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.12.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/index.html<https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fcomponents%2Forg.apache.nifi%2Fnifi-kafka-2-6-nar%2F1.12.0%2Forg.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6%2Findex.html&data=02%7C01%7Cnathan.english%40bt.com%7C7f28164b2851457ee32208d8609178f6%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365525219742926&sdata=vQID2ICeVLl%2Bevo6K%2FNlRy8AJTOgHQmMAkfrwxVta5Q%3D&reserved=0> then you'll see that we do actually capture attributes such as kafka.topic and so on.  Flowfiles would also be properly grouped by that.  What I'm not positive of is it could handle reading from multiple topics at the same time while also honoring and determining each of their distinct schemas.  Would need to test/verify that scenario to be sure.  If you do have a bunch of topics and they could grow/change then keeping this single processor approach makes sense.  If you can go the route of one ConsumeKafkaRecord per topic then obviously that would work well.

Not seeing your flow though I cannot be certain where the bottleneck(s) exist and provide guidance.  But this is without a doubt a vital skill to achieving maximum performance.

You'd have to show/share a ton more details for folks here to be helpful in walking through the full design.  Or explain the end to end flow.

As an additional food for thought if the flows are indeed 'from kafka -> do stuff -> back to kafka' this is likely a great use case for stateless-nifi.

Thanks

On Wed, Sep 23, 2020 at 10:43 AM <na...@bt.com>> wrote:
Hi Joe,

Thanks for getting back to me so quickly.

Our disk setup is as follows:
Path
Storage Type
Format
Capacity
Content
/
100GB OS SSD
ext4
89.9GB
OS, NiFi install, Logs
/data/1/
2 x 4TB SAS Hard Drives in RAID 1
ext4
3.7TB
Database and Flowfile Repos
/data/2/
8 x 4TB SAS Hard Drives in RAID 10
ext4
14.6TB
Content Repo
/data/3/
2 x 4TB SAS Hard Drives in RAID 1
ext4
3.7TB
Provence Repo
/ssd
1 x 4TB PCIe NVMe SSD
ext4
3.7TB
Content Repo (Used instead of /data/2/ as a test), to see if CPU was bottlenecked by Disk operations.

I will certainly take a look at those. One question with the consume record processor is how I would consume from multiple topics and ensure the correct Avro schema is used to deserialise the message? We have 1:1 mapping of schemas to topics. At the moment the ConsumeKafka processor is reading from all topics in one consumer. I’m assuming the attribute kafka.topic attribute doesn’t exist at this stage? We use the Avro Schema Registry Controller as we don’t have a schema registry in place yet.

Kind Regards,

Nathan

From: Joe Witt [mailto:joe.witt@gmail.com<ma...@gmail.com>]
Sent: 23 September 2020 17:33
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Nathan

You have plenty powerful machines to hit super high speeds but what I cannot tell is how the disks are setup/capability and layout wise and relative to our three repos of importance.  You'll need to share those details.

That said, the design of the flow matters.  The Kafka processors that aren't record oriented will perform poorly unless they're acquiring data in their natural batches as they arrive from kafka.  In short, use the record oriented processors from Kafka.  In it you can even deal with the fact you want to go from AVRO to Json and so on.  These processors have a tougher learning curve but they perform extremely well and we have powerful processors to go along with them for common patterns.

You absolutely should be able to get to the big numbers you have seen.  It requires great flow design (powerful machines are secondary).

Thanks

On Wed, Sep 23, 2020 at 9:26 AM <na...@bt.com>> wrote:
Hi All,

We’ve got a NiFi 3 Node Cluster running on 3 x 40 CPU, 256GB RAM (32G Java Heap) servers. However, we have only been able to achieve a consumption of ~9.48GB Consumption Compressed (38.53GB Uncompressed) over 5 minutes, with a production rate of ~16.84GB out of the cluster over  5 mins. This is much lower than we were expecting based on what we have read. With this throughput we see a CPU load ~32 on all nodes, so we know there isn’t much else we can get out of the CPU).

We have also tried SSDs, Raided and Unraided HDDs for the content repo storage, but they haven’t made a difference to the amount we can process.

The process is as follows:

1.       Our flow reads from Kafka Compressed (Maximum of 2000 records per file). It then converts them from Avro to JSON. (ConsumeKafka_0_10 --> UpdateAttribute --> ConvertRecord)

2.       Depending on which topic the flow file is consumed from, we then send the message to one of 10 potential process groups, each containing between 3 and 5 processors within the process groups. (RouteOnAttribute --> Relevant Processing Group containing JoltTransformJSON and several custom processors we have made).

3.       Finally, we produce the flow file content back to one of several Kafka topics, based on the input topic name in Avro format with Snappy compression on the Kafka topic.

Inspecting the queued message counts, it indicates that the Jolt Transforms are taking the time to process (Large queues before JOLT processors, small or no queues afterwards). But I’m not sure why this is any worse than the rest of the processors as the event duration is less than a second when inspecting in provenance? We have tuned the number of concurrent tasks, duration and schedules to get the performance we have so far.

I’m not sure if there is anything anyone could recommend or suggest to try and make improvements? We need to achieve a rate around 5x of what it’s currently processing with the same number of nodes. We are running out of ideas on how to accomplish this and may have to consider alternatives.

Kind Regards,

Nathan

Re: NiFi V1.9.2 Performance

Posted by Bryan Bende <bb...@gmail.com>.
Regarding the batching, I would have to double check the code, but since
you said the demarcator is empty string, I think that means it is not
batching and putting one message to one flow file. Basically if a
demarcator is not set then batch size is ignored.

Regarding the processors/tasks, lets take one topic with 11 partitions as
an example, if you make a consumer processor for this topic with 1
concurrent task, then you have 3 instances of this processor since you have
a 3 node cluster, so you might end up with something like this...

node 1 - ConsumeKafka
  Task 1 - 4 partitions

node 2 - ConsumeKafka
  Task 1 - 4 partitions

node 3 - ConsumeKafka
  Task 1 - 3 partitions

It may not be exactly like that, but just an example as to how it should be
assigned.

To add more parallelism you could then increase concurrent tasks up to
maybe 4 and you get something like this...

node 1 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - 1 partition

node 2 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - 1 partition

node 3 - ConsumeKafka
  Task 1 - 1 partition
  Task 2 - 1 partition
  Task 3 - 1 partition
  Task 4 - nothing

If you go higher than 4 concurrent tasks you will just end up creating more
consumers than partitions, and there is nothing to assign them.


On Thu, Sep 24, 2020 at 9:30 AM <na...@bt.com> wrote:

> Hi Bryan,
>
>
>
> We have configured the processor to read in a maximum batch size of 2k
> messages, which does seem to have one than more Kafka message in the flow
> file.
>
>
>
> Completely understand on the Load balancing, we tried several iterations
> of 1 task to one topic partition. However, we still found it to be loaded
> towards one specific node. I will try splitting it into multiple processors
> to see if this handles it any better. We have 10 topics with 11 partitions.
> (one topic with 2 partitions). So I should set all concurrent tasks to 1
> with multiple processors (One processor per topic)?
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
> *From:* Bryan Bende [mailto:bbende@gmail.com]
> *Sent:* 24 September 2020 13:59
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> I'm not sure how many topics you have, but the biggest improvement would
> probably be to not do a single message per flow file. You want to batch
> together lots of messages coming out of ConsumeKafka using a demarcator,
> and then convert/transform them in batches. You may need to have a separate
> consumer processor for each topic in order to do this correctly.
>
>
>
> You generally shouldn't need to use a load balanced connection after
> ConsumeKafka. If one node is being favored it is likely that the number of
> partitions in your topic is not lining up with the # of nifi nodes X # of
> concurrent consumer tasks. In the simplest case, if your topic had one
> partition, and you have 3 nifi nodes with Consumekafka with 1 concurrent
> task each, then all your messages will only get consumed on one of the
> nodes. If you have 3 partitions then it should be roughly equal. In your
> case you have 3 nodes with concurrent tasks set to 12, so you have
> potentially 36 consumers, which means if you have anything less than 36
> partitions then it is not going to be balanced equally.
>
>
>
> On Thu, Sep 24, 2020 at 3:54 AM <na...@bt.com> wrote:
>
> Hi Joe,
>
>
>
> The Raids seem to give us a good IOPS number when we’ve tried testing
> them. We have seen a 300ms wait time on the Content Repo, hence why we have
> tried SSDs for the content repo as we assumed that was the bottleneck. The
> other Raids seemed OK to us.
>
>
>
> I will certainly look into the ConsumeKafkaRecord processor today and will
> come up with a solution to use it. I will feedback on what I find.
>
>
>
> In regards to our current flow configuration, we have the following
> high-level process groups:
>
> ·         ConsumeData -  Consumes the data from Kafka, Add some
> additional attributes, Converts to JSON from Avro
>
> ·         TransfromData – This is where the majority of the work happens,
> it gets routed on an attribute to a set sub-process group based on the type
> of data it is (this is decided based on the kafka.topic attribute) where
> the processing happens (Explained in more detail below)
>
> ·         ProduceData – Publishes the Record back to Kafka using the
> PublishKafkaRecord processor, add some attributes to calculate processing
> duration it took to process and logs this out.
>
> ·         Error Handling – A General Catch all. Luckily we don’t see this
> triggered very often, but it logs a message and writes the file to disk.
>
>
>
> *ConsumeData* process group:
>
> ·         ConsumeKafka, as mentioned in earlier emails, consumes the
> messages from multiple Kafka Topics with a Message Demarcator of an empty
> string and the Max poll Records set at 2,000.
>
> ·         The Consumed Flows queue uses RoundRobin load balancing as we
> have found one node is favoured for the consumption from Kafka so wanted to
> distribute the load.
>
> ·         AddAttributes uses the UpdateAttribute to add additional
> information to the flow file (Data Type, Output Kafka Topic) based on the
> Kafka.topic attribute on the flow file.
>
> ·         Finally, Convert Record converts the content of the flow file
> from Avro to JSON using the AvroRead and JsonRecordSetWriter Controller
> Services. In Additional we use the inbuilt AvroSchemaRegistry.
>
>
>
> *TransformData* process group. Due to the size, it’s not easy to show in
> a screenshot.
>
> ·         Flow files are routed based on attribute to 1 of 10 processing
> groups (based on the kafka.topic attribute)
>
> ·         Each Data Type has its own processing group which is I have
> gone into detail on one below
>
> ·         There are two failure output ports one for the initial routing
> issues, another for any errors that have happened in the sub-processing
> groups. There is also the Succesful output port.
>
>
>
>
>
>
>
> *HTTP Data Type Sub-processing group (Inside TransformData):*
>
> ·         The sub-processing groups all follow a somewhat similar layout
> but have different custom processors after the initial Jolt Transformation
>
> ·         The Jolt Transformation does the majority of the heavy lifting
> in converting the message into the right format to match the output Avro
> schema requirements. The Jolt Spec contains a chain of operations including
> modify-overwrite-beta, shift, and default
>
> ·         Each processor after the Jolt transformation does something
> different to meet a specific requirement not possible with other Nifi
> processors. Some like the IPAddressProcessor are reused in processing for
> other data types. We are planning to move into a single post-processing
> group where all data types are routed, before being published to Kafka.
>
>
>
> *Produce Data*, produces the messages back to different Kafka topics
> which have been defined in the output.topic (set in the consume data stage)
>
> ·         PublishKafkaRecord converts the message from JSON back to Avro
> (Different schema to input) using the JSONTreeReader and
> AvroRecordSetWriter Controller services. We have set it to Guarantee Single
> Node Delivery, and use Snappy Compression.
>
> ·         Set Exit Attributes adds a Final Processing time to the flow
> file
>
> ·         The Calculate duration uses this to work out its overall time
> processing
>
> ·         Then it’s finally logged to file so we can analyse the full
> processing time.
>
>
>
> Below is the scheduling settings we are using for processors:
>
> *Group*
>
> *Processor*
>
> *Concurrent*
>
> *Tasks*
>
> *Run*
>
> *Duration*
>
> *Yield*
>
> *Duration*
>
> *Other*
>
> Consume Data
>
> ConsumeKafka
>
> 12
>
> 0
>
> 1 mS
>
> Message Demarcator = "Empty string set"
>
> Max Poll Records = 2,000
>
> AddAttributes
>
> 10
>
> 0
>
> 1 S
>
> ConvertRecord
>
> 10
>
> 0
>
> 1 S
>
> TransformData
>
> RouteOnAttribute
>
> 10
>
> 0
>
> 1 S
>
> HTTP Jolt
>
> 5
>
> 0
>
> 200 mS
>
> HTTP Post Processor
>
> 2
>
> 0
>
> 1 S
>
> Produce Data
>
> PublishKafka
>
> 10
>
> 0
>
> 1 S
>
> Max Request Size = 1 MB
>
> Compression Type = snappy
>
> batch.size = 80,000 (Based on some custom producers we have written in the
> past)
>
> linger.ms
> <https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Flinger.ms%2F&data=02%7C01%7Cnathan.english%40bt.com%7C6f57e86a395d4736823b08d86089bbc6%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365491964669606&sdata=yGv32b1trlVt1xphCtXGFDm8yupCX1Gr8miYBGDmcXg%3D&reserved=0> =
> 50,000 (Based on some custom producers we have written in the past)
>
> Set Exit Attributes
>
> 10
>
> 0
>
> 1 S
>
> Calculate Duration
>
> 10
>
> 0
>
> 1 S
>
> Log Success
>
> 10
>
> 0
>
> 1 S
>
>
>
> The Queue configs are as follows:
>
>
>
> *Queue*
>
>
> *Back Pressure Object Threshold*
>
>
> *Back Pressure Size Threshold*
>
> *Load Balance*
>
> After ConsumeKafka
>
> 20,000
>
> 1 GB
>
> Round Robin
>
> After Route On Attribute
>
> All Other Queues In TransformData
>
> 10,000
>
> 1 GB
>
> No
>
> All Failure Routes
>
> 10,000
>
> 1 GB
>
> No
>
> Between ConsumerData And TransformData
>
> Between TransformData And ProduceData
>
> All Other Queues In ConsumeData
>
> Before Route On Attribute
>
> All Queues In ProduceData
>
> 20,000
>
> 1 GB
>
> No
>
>
>
> We also have a Maximum Timer Driven Thread Count of 200 set.
>
>
>
> Stateless NiFi would fit this perfectly, but from what I have read, it’s
> not available in v1.9.2? We are stuck on 1.9.2 as we are using the
> Zookeeper provided as part of a Cloudera cluster. However, I’m wondering if
> we would be better suited to the volatile repositories, than writing to
> disk?
>
>
>
> Thanks again for the advice so far Joe, you’ve given us some confidence
> it’s us doing something wrong and not something with NiFi.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Joe Witt [mailto:joe.witt@gmail.com]
> *Sent:* 23 September 2020 19:05
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Nathan
>
>
>
> Not sure what read/write rates you'll get in these RAID-10 configs but
> generally this seems like it should be fine (100s of MB/sec per node range
> at least).  Whereas now you're seeing about 20MB/sec/node.  This is
> definitely very low.
>
>
>
> If you review
> http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.12.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/index.html
> <https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fcomponents%2Forg.apache.nifi%2Fnifi-kafka-2-6-nar%2F1.12.0%2Forg.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6%2Findex.html&data=02%7C01%7Cnathan.english%40bt.com%7C6f57e86a395d4736823b08d86089bbc6%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365491964669606&sdata=%2Bh8n3NAc7eYKMyzBAiFcKC4mpR3%2BLnuDyqgrUp%2FzDvo%3D&reserved=0>
> then you'll see that we do actually capture attributes such as kafka.topic
> and so on.  Flowfiles would also be properly grouped by that.  What I'm not
> positive of is it could handle reading from multiple topics at the same
> time while also honoring and determining each of their distinct schemas.
> Would need to test/verify that scenario to be sure.  If you do have a bunch
> of topics and they could grow/change then keeping this single processor
> approach makes sense.  If you can go the route of one ConsumeKafkaRecord
> per topic then obviously that would work well.
>
>
>
> Not seeing your flow though I cannot be certain where the bottleneck(s)
> exist and provide guidance.  But this is without a doubt a vital skill to
> achieving maximum performance.
>
>
>
> You'd have to show/share a ton more details for folks here to be helpful
> in walking through the full design.  Or explain the end to end flow.
>
>
>
> As an additional food for thought if the flows are indeed 'from kafka ->
> do stuff -> back to kafka' this is likely a great use case for
> stateless-nifi.
>
>
>
> Thanks
>
>
>
> On Wed, Sep 23, 2020 at 10:43 AM <na...@bt.com> wrote:
>
> Hi Joe,
>
>
>
> Thanks for getting back to me so quickly.
>
>
>
> Our disk setup is as follows:
>
> Path
>
> Storage Type
>
> Format
>
> Capacity
>
> Content
>
> /
>
> 100GB OS SSD
>
> ext4
>
> 89.9GB
>
> OS, NiFi install, Logs
>
> /data/1/
>
> 2 x 4TB SAS Hard Drives in RAID 1
>
> ext4
>
> 3.7TB
>
> Database and Flowfile Repos
>
> /data/2/
>
> 8 x 4TB SAS Hard Drives in RAID 10
>
> ext4
>
> 14.6TB
>
> Content Repo
>
> /data/3/
>
> 2 x 4TB SAS Hard Drives in RAID 1
>
> ext4
>
> 3.7TB
>
> Provence Repo
>
> /ssd
>
> 1 x 4TB PCIe NVMe SSD
>
> ext4
>
> 3.7TB
>
> Content Repo (Used instead of /data/2/ as a test), to see if CPU was
> bottlenecked by Disk operations.
>
>
>
> I will certainly take a look at those. One question with the consume
> record processor is how I would consume from multiple topics and ensure the
> correct Avro schema is used to deserialise the message? We have 1:1 mapping
> of schemas to topics. At the moment the ConsumeKafka processor is reading
> from all topics in one consumer. I’m assuming the attribute kafka.topic
> attribute doesn’t exist at this stage? We use the Avro Schema Registry
> Controller as we don’t have a schema registry in place yet.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Joe Witt [mailto:joe.witt@gmail.com]
> *Sent:* 23 September 2020 17:33
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Nathan
>
>
>
> You have plenty powerful machines to hit super high speeds but what I
> cannot tell is how the disks are setup/capability and layout wise and
> relative to our three repos of importance.  You'll need to share those
> details.
>
>
>
> That said, the design of the flow matters.  The Kafka processors that
> aren't record oriented will perform poorly unless they're acquiring data in
> their natural batches as they arrive from kafka.  In short, use the record
> oriented processors from Kafka.  In it you can even deal with the fact you
> want to go from AVRO to Json and so on.  These processors have a tougher
> learning curve but they perform extremely well and we have powerful
> processors to go along with them for common patterns.
>
>
>
> You absolutely should be able to get to the big numbers you have seen.  It
> requires great flow design (powerful machines are secondary).
>
>
>
> Thanks
>
>
>
> On Wed, Sep 23, 2020 at 9:26 AM <na...@bt.com> wrote:
>
> Hi All,
>
>
>
> We’ve got a NiFi 3 Node Cluster running on 3 x 40 CPU, 256GB RAM (32G Java
> Heap) servers. However, we have only been able to achieve a consumption of
> ~9.48GB Consumption Compressed (38.53GB Uncompressed) over 5 minutes, with
> a production rate of ~16.84GB out of the cluster over  5 mins. This is much
> lower than we were expecting based on what we have read. With this
> throughput we see a CPU load ~32 on all nodes, so we know there isn’t much
> else we can get out of the CPU).
>
>
>
> We have also tried SSDs, Raided and Unraided HDDs for the content repo
> storage, but they haven’t made a difference to the amount we can process.
>
>
>
> The process is as follows:
>
> 1.       Our flow reads from Kafka Compressed (Maximum of 2000 records
> per file). It then converts them from Avro to JSON. (ConsumeKafka_0_10 à
> UpdateAttribute à ConvertRecord)
>
> 2.       Depending on which topic the flow file is consumed from, we then
> send the message to one of 10 potential process groups, each containing
> between 3 and 5 processors within the process groups. (RouteOnAttribute à
> Relevant Processing Group containing JoltTransformJSON and several custom
> processors we have made).
>
> 3.       Finally, we produce the flow file content back to one of several
> Kafka topics, based on the input topic name in Avro format with Snappy
> compression on the Kafka topic.
>
>
>
> Inspecting the queued message counts, it indicates that the Jolt
> Transforms are taking the time to process (Large queues before JOLT
> processors, small or no queues afterwards). But I’m not sure why this is
> any worse than the rest of the processors as the event duration is less
> than a second when inspecting in provenance? We have tuned the number of
> concurrent tasks, duration and schedules to get the performance we have so
> far.
>
>
>
> I’m not sure if there is anything anyone could recommend or suggest to try
> and make improvements? We need to achieve a rate around 5x of what it’s
> currently processing with the same number of nodes. We are running out of
> ideas on how to accomplish this and may have to consider alternatives.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>

RE: NiFi V1.9.2 Performance

Posted by na...@bt.com.
Hi Bryan,

We have configured the processor to read in a maximum batch size of 2k messages, which does seem to have one than more Kafka message in the flow file.

Completely understand on the Load balancing, we tried several iterations of 1 task to one topic partition. However, we still found it to be loaded towards one specific node. I will try splitting it into multiple processors to see if this handles it any better. We have 10 topics with 11 partitions. (one topic with 2 partitions). So I should set all concurrent tasks to 1 with multiple processors (One processor per topic)?

Kind Regards,

Nathan
From: Bryan Bende [mailto:bbende@gmail.com]
Sent: 24 September 2020 13:59
To: users@nifi.apache.org
Subject: Re: NiFi V1.9.2 Performance

I'm not sure how many topics you have, but the biggest improvement would probably be to not do a single message per flow file. You want to batch together lots of messages coming out of ConsumeKafka using a demarcator, and then convert/transform them in batches. You may need to have a separate consumer processor for each topic in order to do this correctly.

You generally shouldn't need to use a load balanced connection after ConsumeKafka. If one node is being favored it is likely that the number of partitions in your topic is not lining up with the # of nifi nodes X # of concurrent consumer tasks. In the simplest case, if your topic had one partition, and you have 3 nifi nodes with Consumekafka with 1 concurrent task each, then all your messages will only get consumed on one of the nodes. If you have 3 partitions then it should be roughly equal. In your case you have 3 nodes with concurrent tasks set to 12, so you have potentially 36 consumers, which means if you have anything less than 36 partitions then it is not going to be balanced equally.

On Thu, Sep 24, 2020 at 3:54 AM <na...@bt.com>> wrote:
Hi Joe,

The Raids seem to give us a good IOPS number when we’ve tried testing them. We have seen a 300ms wait time on the Content Repo, hence why we have tried SSDs for the content repo as we assumed that was the bottleneck. The other Raids seemed OK to us.

I will certainly look into the ConsumeKafkaRecord processor today and will come up with a solution to use it. I will feedback on what I find.

In regards to our current flow configuration, we have the following high-level process groups:

•         ConsumeData -  Consumes the data from Kafka, Add some additional attributes, Converts to JSON from Avro

•         TransfromData – This is where the majority of the work happens, it gets routed on an attribute to a set sub-process group based on the type of data it is (this is decided based on the kafka.topic attribute) where the processing happens (Explained in more detail below)

•         ProduceData – Publishes the Record back to Kafka using the PublishKafkaRecord processor, add some attributes to calculate processing duration it took to process and logs this out.

•         Error Handling – A General Catch all. Luckily we don’t see this triggered very often, but it logs a message and writes the file to disk.
[cid:image001.png@01D6927C.D815DDC0]

ConsumeData process group:

•         ConsumeKafka, as mentioned in earlier emails, consumes the messages from multiple Kafka Topics with a Message Demarcator of an empty string and the Max poll Records set at 2,000.

•         The Consumed Flows queue uses RoundRobin load balancing as we have found one node is favoured for the consumption from Kafka so wanted to distribute the load.

•         AddAttributes uses the UpdateAttribute to add additional information to the flow file (Data Type, Output Kafka Topic) based on the Kafka.topic attribute on the flow file.

•         Finally, Convert Record converts the content of the flow file from Avro to JSON using the AvroRead and JsonRecordSetWriter Controller Services. In Additional we use the inbuilt AvroSchemaRegistry.
[cid:image002.png@01D6927C.D815DDC0]

TransformData process group. Due to the size, it’s not easy to show in a screenshot.

•         Flow files are routed based on attribute to 1 of 10 processing groups (based on the kafka.topic attribute)

•         Each Data Type has its own processing group which is I have gone into detail on one below

•         There are two failure output ports one for the initial routing issues, another for any errors that have happened in the sub-processing groups. There is also the Succesful output port.

[cid:image003.png@01D6927C.D815DDC0]


HTTP Data Type Sub-processing group (Inside TransformData):

•         The sub-processing groups all follow a somewhat similar layout but have different custom processors after the initial Jolt Transformation

•         The Jolt Transformation does the majority of the heavy lifting in converting the message into the right format to match the output Avro schema requirements. The Jolt Spec contains a chain of operations including modify-overwrite-beta, shift, and default

•         Each processor after the Jolt transformation does something different to meet a specific requirement not possible with other Nifi processors. Some like the IPAddressProcessor are reused in processing for other data types. We are planning to move into a single post-processing group where all data types are routed, before being published to Kafka.
[cid:image004.png@01D6927C.D815DDC0]

Produce Data, produces the messages back to different Kafka topics which have been defined in the output.topic (set in the consume data stage)

•         PublishKafkaRecord converts the message from JSON back to Avro (Different schema to input) using the JSONTreeReader and AvroRecordSetWriter Controller services. We have set it to Guarantee Single Node Delivery, and use Snappy Compression.

•         Set Exit Attributes adds a Final Processing time to the flow file

•         The Calculate duration uses this to work out its overall time processing

•         Then it’s finally logged to file so we can analyse the full processing time.
[cid:image005.png@01D6927C.D815DDC0]

Below is the scheduling settings we are using for processors:
Group
Processor
Concurrent
Tasks
Run
Duration
Yield
Duration
Other
Consume Data
ConsumeKafka
12
0
1 mS
Message Demarcator = "Empty string set"
Max Poll Records = 2,000
AddAttributes
10
0
1 S
ConvertRecord
10
0
1 S
TransformData
RouteOnAttribute
10
0
1 S
HTTP Jolt
5
0
200 mS
HTTP Post Processor
2
0
1 S
Produce Data
PublishKafka
10
0
1 S
Max Request Size = 1 MB
Compression Type = snappy
batch.size = 80,000 (Based on some custom producers we have written in the past)
linger.ms<https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Flinger.ms%2F&data=02%7C01%7Cnathan.english%40bt.com%7C6f57e86a395d4736823b08d86089bbc6%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365491964669606&sdata=yGv32b1trlVt1xphCtXGFDm8yupCX1Gr8miYBGDmcXg%3D&reserved=0> = 50,000 (Based on some custom producers we have written in the past)
Set Exit Attributes
10
0
1 S
Calculate Duration
10
0
1 S
Log Success
10
0
1 S

The Queue configs are as follows:

Queue
Back Pressure
Object Threshold
Back Pressure
Size Threshold
Load Balance
After ConsumeKafka
20,000
1 GB
Round Robin
After Route On Attribute
All Other Queues In TransformData
10,000
1 GB
No
All Failure Routes
10,000
1 GB
No
Between ConsumerData And TransformData
Between TransformData And ProduceData
All Other Queues In ConsumeData
Before Route On Attribute
All Queues In ProduceData
20,000
1 GB
No

We also have a Maximum Timer Driven Thread Count of 200 set.

Stateless NiFi would fit this perfectly, but from what I have read, it’s not available in v1.9.2? We are stuck on 1.9.2 as we are using the Zookeeper provided as part of a Cloudera cluster. However, I’m wondering if we would be better suited to the volatile repositories, than writing to disk?

Thanks again for the advice so far Joe, you’ve given us some confidence it’s us doing something wrong and not something with NiFi.

Kind Regards,

Nathan

From: Joe Witt [mailto:joe.witt@gmail.com<ma...@gmail.com>]
Sent: 23 September 2020 19:05
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Nathan

Not sure what read/write rates you'll get in these RAID-10 configs but generally this seems like it should be fine (100s of MB/sec per node range at least).  Whereas now you're seeing about 20MB/sec/node.  This is definitely very low.

If you review http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.12.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/index.html<https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fcomponents%2Forg.apache.nifi%2Fnifi-kafka-2-6-nar%2F1.12.0%2Forg.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6%2Findex.html&data=02%7C01%7Cnathan.english%40bt.com%7C6f57e86a395d4736823b08d86089bbc6%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365491964669606&sdata=%2Bh8n3NAc7eYKMyzBAiFcKC4mpR3%2BLnuDyqgrUp%2FzDvo%3D&reserved=0> then you'll see that we do actually capture attributes such as kafka.topic and so on.  Flowfiles would also be properly grouped by that.  What I'm not positive of is it could handle reading from multiple topics at the same time while also honoring and determining each of their distinct schemas.  Would need to test/verify that scenario to be sure.  If you do have a bunch of topics and they could grow/change then keeping this single processor approach makes sense.  If you can go the route of one ConsumeKafkaRecord per topic then obviously that would work well.

Not seeing your flow though I cannot be certain where the bottleneck(s) exist and provide guidance.  But this is without a doubt a vital skill to achieving maximum performance.

You'd have to show/share a ton more details for folks here to be helpful in walking through the full design.  Or explain the end to end flow.

As an additional food for thought if the flows are indeed 'from kafka -> do stuff -> back to kafka' this is likely a great use case for stateless-nifi.

Thanks

On Wed, Sep 23, 2020 at 10:43 AM <na...@bt.com>> wrote:
Hi Joe,

Thanks for getting back to me so quickly.

Our disk setup is as follows:
Path
Storage Type
Format
Capacity
Content
/
100GB OS SSD
ext4
89.9GB
OS, NiFi install, Logs
/data/1/
2 x 4TB SAS Hard Drives in RAID 1
ext4
3.7TB
Database and Flowfile Repos
/data/2/
8 x 4TB SAS Hard Drives in RAID 10
ext4
14.6TB
Content Repo
/data/3/
2 x 4TB SAS Hard Drives in RAID 1
ext4
3.7TB
Provence Repo
/ssd
1 x 4TB PCIe NVMe SSD
ext4
3.7TB
Content Repo (Used instead of /data/2/ as a test), to see if CPU was bottlenecked by Disk operations.

I will certainly take a look at those. One question with the consume record processor is how I would consume from multiple topics and ensure the correct Avro schema is used to deserialise the message? We have 1:1 mapping of schemas to topics. At the moment the ConsumeKafka processor is reading from all topics in one consumer. I’m assuming the attribute kafka.topic attribute doesn’t exist at this stage? We use the Avro Schema Registry Controller as we don’t have a schema registry in place yet.

Kind Regards,

Nathan

From: Joe Witt [mailto:joe.witt@gmail.com<ma...@gmail.com>]
Sent: 23 September 2020 17:33
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Nathan

You have plenty powerful machines to hit super high speeds but what I cannot tell is how the disks are setup/capability and layout wise and relative to our three repos of importance.  You'll need to share those details.

That said, the design of the flow matters.  The Kafka processors that aren't record oriented will perform poorly unless they're acquiring data in their natural batches as they arrive from kafka.  In short, use the record oriented processors from Kafka.  In it you can even deal with the fact you want to go from AVRO to Json and so on.  These processors have a tougher learning curve but they perform extremely well and we have powerful processors to go along with them for common patterns.

You absolutely should be able to get to the big numbers you have seen.  It requires great flow design (powerful machines are secondary).

Thanks

On Wed, Sep 23, 2020 at 9:26 AM <na...@bt.com>> wrote:
Hi All,

We’ve got a NiFi 3 Node Cluster running on 3 x 40 CPU, 256GB RAM (32G Java Heap) servers. However, we have only been able to achieve a consumption of ~9.48GB Consumption Compressed (38.53GB Uncompressed) over 5 minutes, with a production rate of ~16.84GB out of the cluster over  5 mins. This is much lower than we were expecting based on what we have read. With this throughput we see a CPU load ~32 on all nodes, so we know there isn’t much else we can get out of the CPU).

We have also tried SSDs, Raided and Unraided HDDs for the content repo storage, but they haven’t made a difference to the amount we can process.

The process is as follows:

1.       Our flow reads from Kafka Compressed (Maximum of 2000 records per file). It then converts them from Avro to JSON. (ConsumeKafka_0_10 --> UpdateAttribute --> ConvertRecord)

2.       Depending on which topic the flow file is consumed from, we then send the message to one of 10 potential process groups, each containing between 3 and 5 processors within the process groups. (RouteOnAttribute --> Relevant Processing Group containing JoltTransformJSON and several custom processors we have made).

3.       Finally, we produce the flow file content back to one of several Kafka topics, based on the input topic name in Avro format with Snappy compression on the Kafka topic.

Inspecting the queued message counts, it indicates that the Jolt Transforms are taking the time to process (Large queues before JOLT processors, small or no queues afterwards). But I’m not sure why this is any worse than the rest of the processors as the event duration is less than a second when inspecting in provenance? We have tuned the number of concurrent tasks, duration and schedules to get the performance we have so far.

I’m not sure if there is anything anyone could recommend or suggest to try and make improvements? We need to achieve a rate around 5x of what it’s currently processing with the same number of nodes. We are running out of ideas on how to accomplish this and may have to consider alternatives.

Kind Regards,

Nathan

Re: NiFi V1.9.2 Performance

Posted by Bryan Bende <bb...@gmail.com>.
I'm not sure how many topics you have, but the biggest improvement would
probably be to not do a single message per flow file. You want to batch
together lots of messages coming out of ConsumeKafka using a demarcator,
and then convert/transform them in batches. You may need to have a separate
consumer processor for each topic in order to do this correctly.

You generally shouldn't need to use a load balanced connection after
ConsumeKafka. If one node is being favored it is likely that the number of
partitions in your topic is not lining up with the # of nifi nodes X # of
concurrent consumer tasks. In the simplest case, if your topic had one
partition, and you have 3 nifi nodes with Consumekafka with 1 concurrent
task each, then all your messages will only get consumed on one of the
nodes. If you have 3 partitions then it should be roughly equal. In your
case you have 3 nodes with concurrent tasks set to 12, so you have
potentially 36 consumers, which means if you have anything less than 36
partitions then it is not going to be balanced equally.

On Thu, Sep 24, 2020 at 3:54 AM <na...@bt.com> wrote:

> Hi Joe,
>
>
>
> The Raids seem to give us a good IOPS number when we’ve tried testing
> them. We have seen a 300ms wait time on the Content Repo, hence why we have
> tried SSDs for the content repo as we assumed that was the bottleneck. The
> other Raids seemed OK to us.
>
>
>
> I will certainly look into the ConsumeKafkaRecord processor today and will
> come up with a solution to use it. I will feedback on what I find.
>
>
>
> In regards to our current flow configuration, we have the following
> high-level process groups:
>
> ·         ConsumeData -  Consumes the data from Kafka, Add some
> additional attributes, Converts to JSON from Avro
>
> ·         TransfromData – This is where the majority of the work happens,
> it gets routed on an attribute to a set sub-process group based on the type
> of data it is (this is decided based on the kafka.topic attribute) where
> the processing happens (Explained in more detail below)
>
> ·         ProduceData – Publishes the Record back to Kafka using the
> PublishKafkaRecord processor, add some attributes to calculate processing
> duration it took to process and logs this out.
>
> ·         Error Handling – A General Catch all. Luckily we don’t see this
> triggered very often, but it logs a message and writes the file to disk.
>
>
>
> *ConsumeData* process group:
>
> ·         ConsumeKafka, as mentioned in earlier emails, consumes the
> messages from multiple Kafka Topics with a Message Demarcator of an empty
> string and the Max poll Records set at 2,000.
>
> ·         The Consumed Flows queue uses RoundRobin load balancing as we
> have found one node is favoured for the consumption from Kafka so wanted to
> distribute the load.
>
> ·         AddAttributes uses the UpdateAttribute to add additional
> information to the flow file (Data Type, Output Kafka Topic) based on the
> Kafka.topic attribute on the flow file.
>
> ·         Finally, Convert Record converts the content of the flow file
> from Avro to JSON using the AvroRead and JsonRecordSetWriter Controller
> Services. In Additional we use the inbuilt AvroSchemaRegistry.
>
>
>
> *TransformData* process group. Due to the size, it’s not easy to show in
> a screenshot.
>
> ·         Flow files are routed based on attribute to 1 of 10 processing
> groups (based on the kafka.topic attribute)
>
> ·         Each Data Type has its own processing group which is I have
> gone into detail on one below
>
> ·         There are two failure output ports one for the initial routing
> issues, another for any errors that have happened in the sub-processing
> groups. There is also the Succesful output port.
>
>
>
>
>
>
>
> *HTTP Data Type Sub-processing group (Inside TransformData):*
>
> ·         The sub-processing groups all follow a somewhat similar layout
> but have different custom processors after the initial Jolt Transformation
>
> ·         The Jolt Transformation does the majority of the heavy lifting
> in converting the message into the right format to match the output Avro
> schema requirements. The Jolt Spec contains a chain of operations including
> modify-overwrite-beta, shift, and default
>
> ·         Each processor after the Jolt transformation does something
> different to meet a specific requirement not possible with other Nifi
> processors. Some like the IPAddressProcessor are reused in processing for
> other data types. We are planning to move into a single post-processing
> group where all data types are routed, before being published to Kafka.
>
>
>
> *Produce Data*, produces the messages back to different Kafka topics
> which have been defined in the output.topic (set in the consume data stage)
>
> ·         PublishKafkaRecord converts the message from JSON back to Avro
> (Different schema to input) using the JSONTreeReader and
> AvroRecordSetWriter Controller services. We have set it to Guarantee Single
> Node Delivery, and use Snappy Compression.
>
> ·         Set Exit Attributes adds a Final Processing time to the flow
> file
>
> ·         The Calculate duration uses this to work out its overall time
> processing
>
> ·         Then it’s finally logged to file so we can analyse the full
> processing time.
>
>
>
> Below is the scheduling settings we are using for processors:
>
> *Group*
>
> *Processor*
>
> *Concurrent*
>
> *Tasks*
>
> *Run*
>
> *Duration*
>
> *Yield*
>
> *Duration*
>
> *Other*
>
> Consume Data
>
> ConsumeKafka
>
> 12
>
> 0
>
> 1 mS
>
> Message Demarcator = "Empty string set"
>
> Max Poll Records = 2,000
>
> AddAttributes
>
> 10
>
> 0
>
> 1 S
>
> ConvertRecord
>
> 10
>
> 0
>
> 1 S
>
> TransformData
>
> RouteOnAttribute
>
> 10
>
> 0
>
> 1 S
>
> HTTP Jolt
>
> 5
>
> 0
>
> 200 mS
>
> HTTP Post Processor
>
> 2
>
> 0
>
> 1 S
>
> Produce Data
>
> PublishKafka
>
> 10
>
> 0
>
> 1 S
>
> Max Request Size = 1 MB
>
> Compression Type = snappy
>
> batch.size = 80,000 (Based on some custom producers we have written in the
> past)
>
> linger.ms = 50,000 (Based on some custom producers we have written in the
> past)
>
> Set Exit Attributes
>
> 10
>
> 0
>
> 1 S
>
> Calculate Duration
>
> 10
>
> 0
>
> 1 S
>
> Log Success
>
> 10
>
> 0
>
> 1 S
>
>
>
> The Queue configs are as follows:
>
>
>
> *Queue*
>
>
> *Back Pressure Object Threshold*
>
>
> *Back Pressure Size Threshold*
>
> *Load Balance*
>
> After ConsumeKafka
>
> 20,000
>
> 1 GB
>
> Round Robin
>
> After Route On Attribute
>
> All Other Queues In TransformData
>
> 10,000
>
> 1 GB
>
> No
>
> All Failure Routes
>
> 10,000
>
> 1 GB
>
> No
>
> Between ConsumerData And TransformData
>
> Between TransformData And ProduceData
>
> All Other Queues In ConsumeData
>
> Before Route On Attribute
>
> All Queues In ProduceData
>
> 20,000
>
> 1 GB
>
> No
>
>
>
> We also have a Maximum Timer Driven Thread Count of 200 set.
>
>
>
> Stateless NiFi would fit this perfectly, but from what I have read, it’s
> not available in v1.9.2? We are stuck on 1.9.2 as we are using the
> Zookeeper provided as part of a Cloudera cluster. However, I’m wondering if
> we would be better suited to the volatile repositories, than writing to
> disk?
>
>
>
> Thanks again for the advice so far Joe, you’ve given us some confidence
> it’s us doing something wrong and not something with NiFi.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Joe Witt [mailto:joe.witt@gmail.com]
> *Sent:* 23 September 2020 19:05
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Nathan
>
>
>
> Not sure what read/write rates you'll get in these RAID-10 configs but
> generally this seems like it should be fine (100s of MB/sec per node range
> at least).  Whereas now you're seeing about 20MB/sec/node.  This is
> definitely very low.
>
>
>
> If you review
> http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.12.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/index.html
> <https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fcomponents%2Forg.apache.nifi%2Fnifi-kafka-2-6-nar%2F1.12.0%2Forg.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6%2Findex.html&data=02%7C01%7Cnathan.english%40bt.com%7C6a4538ab33064a615a3108d85feb481f%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637364811421579239&sdata=53mfLOJ0H80Yu9KKwMRnhdLdg3GGFfTsmViZL03ZLdw%3D&reserved=0>
> then you'll see that we do actually capture attributes such as kafka.topic
> and so on.  Flowfiles would also be properly grouped by that.  What I'm not
> positive of is it could handle reading from multiple topics at the same
> time while also honoring and determining each of their distinct schemas.
> Would need to test/verify that scenario to be sure.  If you do have a bunch
> of topics and they could grow/change then keeping this single processor
> approach makes sense.  If you can go the route of one ConsumeKafkaRecord
> per topic then obviously that would work well.
>
>
>
> Not seeing your flow though I cannot be certain where the bottleneck(s)
> exist and provide guidance.  But this is without a doubt a vital skill to
> achieving maximum performance.
>
>
>
> You'd have to show/share a ton more details for folks here to be helpful
> in walking through the full design.  Or explain the end to end flow.
>
>
>
> As an additional food for thought if the flows are indeed 'from kafka ->
> do stuff -> back to kafka' this is likely a great use case for
> stateless-nifi.
>
>
>
> Thanks
>
>
>
> On Wed, Sep 23, 2020 at 10:43 AM <na...@bt.com> wrote:
>
> Hi Joe,
>
>
>
> Thanks for getting back to me so quickly.
>
>
>
> Our disk setup is as follows:
>
> Path
>
> Storage Type
>
> Format
>
> Capacity
>
> Content
>
> /
>
> 100GB OS SSD
>
> ext4
>
> 89.9GB
>
> OS, NiFi install, Logs
>
> /data/1/
>
> 2 x 4TB SAS Hard Drives in RAID 1
>
> ext4
>
> 3.7TB
>
> Database and Flowfile Repos
>
> /data/2/
>
> 8 x 4TB SAS Hard Drives in RAID 10
>
> ext4
>
> 14.6TB
>
> Content Repo
>
> /data/3/
>
> 2 x 4TB SAS Hard Drives in RAID 1
>
> ext4
>
> 3.7TB
>
> Provence Repo
>
> /ssd
>
> 1 x 4TB PCIe NVMe SSD
>
> ext4
>
> 3.7TB
>
> Content Repo (Used instead of /data/2/ as a test), to see if CPU was
> bottlenecked by Disk operations.
>
>
>
> I will certainly take a look at those. One question with the consume
> record processor is how I would consume from multiple topics and ensure the
> correct Avro schema is used to deserialise the message? We have 1:1 mapping
> of schemas to topics. At the moment the ConsumeKafka processor is reading
> from all topics in one consumer. I’m assuming the attribute kafka.topic
> attribute doesn’t exist at this stage? We use the Avro Schema Registry
> Controller as we don’t have a schema registry in place yet.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Joe Witt [mailto:joe.witt@gmail.com]
> *Sent:* 23 September 2020 17:33
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Nathan
>
>
>
> You have plenty powerful machines to hit super high speeds but what I
> cannot tell is how the disks are setup/capability and layout wise and
> relative to our three repos of importance.  You'll need to share those
> details.
>
>
>
> That said, the design of the flow matters.  The Kafka processors that
> aren't record oriented will perform poorly unless they're acquiring data in
> their natural batches as they arrive from kafka.  In short, use the record
> oriented processors from Kafka.  In it you can even deal with the fact you
> want to go from AVRO to Json and so on.  These processors have a tougher
> learning curve but they perform extremely well and we have powerful
> processors to go along with them for common patterns.
>
>
>
> You absolutely should be able to get to the big numbers you have seen.  It
> requires great flow design (powerful machines are secondary).
>
>
>
> Thanks
>
>
>
> On Wed, Sep 23, 2020 at 9:26 AM <na...@bt.com> wrote:
>
> Hi All,
>
>
>
> We’ve got a NiFi 3 Node Cluster running on 3 x 40 CPU, 256GB RAM (32G Java
> Heap) servers. However, we have only been able to achieve a consumption of
> ~9.48GB Consumption Compressed (38.53GB Uncompressed) over 5 minutes, with
> a production rate of ~16.84GB out of the cluster over  5 mins. This is much
> lower than we were expecting based on what we have read. With this
> throughput we see a CPU load ~32 on all nodes, so we know there isn’t much
> else we can get out of the CPU).
>
>
>
> We have also tried SSDs, Raided and Unraided HDDs for the content repo
> storage, but they haven’t made a difference to the amount we can process.
>
>
>
> The process is as follows:
>
> 1.       Our flow reads from Kafka Compressed (Maximum of 2000 records
> per file). It then converts them from Avro to JSON. (ConsumeKafka_0_10 à
> UpdateAttribute à ConvertRecord)
>
> 2.       Depending on which topic the flow file is consumed from, we then
> send the message to one of 10 potential process groups, each containing
> between 3 and 5 processors within the process groups. (RouteOnAttribute à
> Relevant Processing Group containing JoltTransformJSON and several custom
> processors we have made).
>
> 3.       Finally, we produce the flow file content back to one of several
> Kafka topics, based on the input topic name in Avro format with Snappy
> compression on the Kafka topic.
>
>
>
> Inspecting the queued message counts, it indicates that the Jolt
> Transforms are taking the time to process (Large queues before JOLT
> processors, small or no queues afterwards). But I’m not sure why this is
> any worse than the rest of the processors as the event duration is less
> than a second when inspecting in provenance? We have tuned the number of
> concurrent tasks, duration and schedules to get the performance we have so
> far.
>
>
>
> I’m not sure if there is anything anyone could recommend or suggest to try
> and make improvements? We need to achieve a rate around 5x of what it’s
> currently processing with the same number of nodes. We are running out of
> ideas on how to accomplish this and may have to consider alternatives.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>

RE: NiFi V1.9.2 Performance

Posted by na...@bt.com.
Hi Joe,

The Raids seem to give us a good IOPS number when we’ve tried testing them. We have seen a 300ms wait time on the Content Repo, hence why we have tried SSDs for the content repo as we assumed that was the bottleneck. The other Raids seemed OK to us.

I will certainly look into the ConsumeKafkaRecord processor today and will come up with a solution to use it. I will feedback on what I find.

In regards to our current flow configuration, we have the following high-level process groups:

·         ConsumeData -  Consumes the data from Kafka, Add some additional attributes, Converts to JSON from Avro

·         TransfromData – This is where the majority of the work happens, it gets routed on an attribute to a set sub-process group based on the type of data it is (this is decided based on the kafka.topic attribute) where the processing happens (Explained in more detail below)

·         ProduceData – Publishes the Record back to Kafka using the PublishKafkaRecord processor, add some attributes to calculate processing duration it took to process and logs this out.

·         Error Handling – A General Catch all. Luckily we don’t see this triggered very often, but it logs a message and writes the file to disk.
[cid:image003.png@01D69250.3C726FE0]

ConsumeData process group:

·         ConsumeKafka, as mentioned in earlier emails, consumes the messages from multiple Kafka Topics with a Message Demarcator of an empty string and the Max poll Records set at 2,000.

·         The Consumed Flows queue uses RoundRobin load balancing as we have found one node is favoured for the consumption from Kafka so wanted to distribute the load.

·         AddAttributes uses the UpdateAttribute to add additional information to the flow file (Data Type, Output Kafka Topic) based on the Kafka.topic attribute on the flow file.

·         Finally, Convert Record converts the content of the flow file from Avro to JSON using the AvroRead and JsonRecordSetWriter Controller Services. In Additional we use the inbuilt AvroSchemaRegistry.
[cid:image006.png@01D69250.3C726FE0]

TransformData process group. Due to the size, it’s not easy to show in a screenshot.

·         Flow files are routed based on attribute to 1 of 10 processing groups (based on the kafka.topic attribute)

·         Each Data Type has its own processing group which is I have gone into detail on one below

·         There are two failure output ports one for the initial routing issues, another for any errors that have happened in the sub-processing groups. There is also the Succesful output port.

[cid:image008.png@01D69250.3C726FE0]


HTTP Data Type Sub-processing group (Inside TransformData):

·         The sub-processing groups all follow a somewhat similar layout but have different custom processors after the initial Jolt Transformation

·         The Jolt Transformation does the majority of the heavy lifting in converting the message into the right format to match the output Avro schema requirements. The Jolt Spec contains a chain of operations including modify-overwrite-beta, shift, and default

·         Each processor after the Jolt transformation does something different to meet a specific requirement not possible with other Nifi processors. Some like the IPAddressProcessor are reused in processing for other data types. We are planning to move into a single post-processing group where all data types are routed, before being published to Kafka.
[cid:image011.png@01D69250.3C726FE0]

Produce Data, produces the messages back to different Kafka topics which have been defined in the output.topic (set in the consume data stage)

·         PublishKafkaRecord converts the message from JSON back to Avro (Different schema to input) using the JSONTreeReader and AvroRecordSetWriter Controller services. We have set it to Guarantee Single Node Delivery, and use Snappy Compression.

·         Set Exit Attributes adds a Final Processing time to the flow file

·         The Calculate duration uses this to work out its overall time processing

·         Then it’s finally logged to file so we can analyse the full processing time.
[cid:image015.png@01D69250.3C726FE0]

Below is the scheduling settings we are using for processors:
Group
Processor
Concurrent
Tasks
Run
Duration
Yield
Duration
Other
Consume Data
ConsumeKafka
12
0
1 mS
Message Demarcator = "Empty string set"
Max Poll Records = 2,000
AddAttributes
10
0
1 S
ConvertRecord
10
0
1 S
TransformData
RouteOnAttribute
10
0
1 S
HTTP Jolt
5
0
200 mS
HTTP Post Processor
2
0
1 S
Produce Data
PublishKafka
10
0
1 S
Max Request Size = 1 MB
Compression Type = snappy
batch.size = 80,000 (Based on some custom producers we have written in the past)
linger.ms<http://linger.ms/> = 50,000 (Based on some custom producers we have written in the past)
Set Exit Attributes
10
0
1 S
Calculate Duration
10
0
1 S
Log Success
10
0
1 S

The Queue configs are as follows:

Queue
Back Pressure
Object Threshold
Back Pressure
Size Threshold
Load Balance
After ConsumeKafka
20,000
1 GB
Round Robin
After Route On Attribute
All Other Queues In TransformData
10,000
1 GB
No
All Failure Routes
10,000
1 GB
No
Between ConsumerData And TransformData
Between TransformData And ProduceData
All Other Queues In ConsumeData
Before Route On Attribute
All Queues In ProduceData
20,000
1 GB
No

We also have a Maximum Timer Driven Thread Count of 200 set.

Stateless NiFi would fit this perfectly, but from what I have read, it’s not available in v1.9.2? We are stuck on 1.9.2 as we are using the Zookeeper provided as part of a Cloudera cluster. However, I’m wondering if we would be better suited to the volatile repositories, than writing to disk?

Thanks again for the advice so far Joe, you’ve given us some confidence it’s us doing something wrong and not something with NiFi.

Kind Regards,

Nathan

From: Joe Witt [mailto:joe.witt@gmail.com]
Sent: 23 September 2020 19:05
To: users@nifi.apache.org
Subject: Re: NiFi V1.9.2 Performance

Nathan

Not sure what read/write rates you'll get in these RAID-10 configs but generally this seems like it should be fine (100s of MB/sec per node range at least).  Whereas now you're seeing about 20MB/sec/node.  This is definitely very low.

If you review http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.12.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/index.html<https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fcomponents%2Forg.apache.nifi%2Fnifi-kafka-2-6-nar%2F1.12.0%2Forg.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6%2Findex.html&data=02%7C01%7Cnathan.english%40bt.com%7C6a4538ab33064a615a3108d85feb481f%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637364811421579239&sdata=53mfLOJ0H80Yu9KKwMRnhdLdg3GGFfTsmViZL03ZLdw%3D&reserved=0> then you'll see that we do actually capture attributes such as kafka.topic and so on.  Flowfiles would also be properly grouped by that.  What I'm not positive of is it could handle reading from multiple topics at the same time while also honoring and determining each of their distinct schemas.  Would need to test/verify that scenario to be sure.  If you do have a bunch of topics and they could grow/change then keeping this single processor approach makes sense.  If you can go the route of one ConsumeKafkaRecord per topic then obviously that would work well.

Not seeing your flow though I cannot be certain where the bottleneck(s) exist and provide guidance.  But this is without a doubt a vital skill to achieving maximum performance.

You'd have to show/share a ton more details for folks here to be helpful in walking through the full design.  Or explain the end to end flow.

As an additional food for thought if the flows are indeed 'from kafka -> do stuff -> back to kafka' this is likely a great use case for stateless-nifi.

Thanks

On Wed, Sep 23, 2020 at 10:43 AM <na...@bt.com>> wrote:
Hi Joe,

Thanks for getting back to me so quickly.

Our disk setup is as follows:
Path
Storage Type
Format
Capacity
Content
/
100GB OS SSD
ext4
89.9GB
OS, NiFi install, Logs
/data/1/
2 x 4TB SAS Hard Drives in RAID 1
ext4
3.7TB
Database and Flowfile Repos
/data/2/
8 x 4TB SAS Hard Drives in RAID 10
ext4
14.6TB
Content Repo
/data/3/
2 x 4TB SAS Hard Drives in RAID 1
ext4
3.7TB
Provence Repo
/ssd
1 x 4TB PCIe NVMe SSD
ext4
3.7TB
Content Repo (Used instead of /data/2/ as a test), to see if CPU was bottlenecked by Disk operations.

I will certainly take a look at those. One question with the consume record processor is how I would consume from multiple topics and ensure the correct Avro schema is used to deserialise the message? We have 1:1 mapping of schemas to topics. At the moment the ConsumeKafka processor is reading from all topics in one consumer. I’m assuming the attribute kafka.topic attribute doesn’t exist at this stage? We use the Avro Schema Registry Controller as we don’t have a schema registry in place yet.

Kind Regards,

Nathan

From: Joe Witt [mailto:joe.witt@gmail.com<ma...@gmail.com>]
Sent: 23 September 2020 17:33
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: NiFi V1.9.2 Performance

Nathan

You have plenty powerful machines to hit super high speeds but what I cannot tell is how the disks are setup/capability and layout wise and relative to our three repos of importance.  You'll need to share those details.

That said, the design of the flow matters.  The Kafka processors that aren't record oriented will perform poorly unless they're acquiring data in their natural batches as they arrive from kafka.  In short, use the record oriented processors from Kafka.  In it you can even deal with the fact you want to go from AVRO to Json and so on.  These processors have a tougher learning curve but they perform extremely well and we have powerful processors to go along with them for common patterns.

You absolutely should be able to get to the big numbers you have seen.  It requires great flow design (powerful machines are secondary).

Thanks

On Wed, Sep 23, 2020 at 9:26 AM <na...@bt.com>> wrote:
Hi All,

We’ve got a NiFi 3 Node Cluster running on 3 x 40 CPU, 256GB RAM (32G Java Heap) servers. However, we have only been able to achieve a consumption of ~9.48GB Consumption Compressed (38.53GB Uncompressed) over 5 minutes, with a production rate of ~16.84GB out of the cluster over  5 mins. This is much lower than we were expecting based on what we have read. With this throughput we see a CPU load ~32 on all nodes, so we know there isn’t much else we can get out of the CPU).

We have also tried SSDs, Raided and Unraided HDDs for the content repo storage, but they haven’t made a difference to the amount we can process.

The process is as follows:

1.       Our flow reads from Kafka Compressed (Maximum of 2000 records per file). It then converts them from Avro to JSON. (ConsumeKafka_0_10 --> UpdateAttribute --> ConvertRecord)

2.       Depending on which topic the flow file is consumed from, we then send the message to one of 10 potential process groups, each containing between 3 and 5 processors within the process groups. (RouteOnAttribute --> Relevant Processing Group containing JoltTransformJSON and several custom processors we have made).

3.       Finally, we produce the flow file content back to one of several Kafka topics, based on the input topic name in Avro format with Snappy compression on the Kafka topic.

Inspecting the queued message counts, it indicates that the Jolt Transforms are taking the time to process (Large queues before JOLT processors, small or no queues afterwards). But I’m not sure why this is any worse than the rest of the processors as the event duration is less than a second when inspecting in provenance? We have tuned the number of concurrent tasks, duration and schedules to get the performance we have so far.

I’m not sure if there is anything anyone could recommend or suggest to try and make improvements? We need to achieve a rate around 5x of what it’s currently processing with the same number of nodes. We are running out of ideas on how to accomplish this and may have to consider alternatives.

Kind Regards,

Nathan

Re: NiFi V1.9.2 Performance

Posted by Joe Witt <jo...@gmail.com>.
Nathan

Not sure what read/write rates you'll get in these RAID-10 configs but
generally this seems like it should be fine (100s of MB/sec per node range
at least).  Whereas now you're seeing about 20MB/sec/node.  This is
definitely very low.

If you review
http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.12.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/index.html
then you'll see that we do actually capture attributes such as kafka.topic
and so on.  Flowfiles would also be properly grouped by that.  What I'm not
positive of is it could handle reading from multiple topics at the same
time while also honoring and determining each of their distinct schemas.
Would need to test/verify that scenario to be sure.  If you do have a bunch
of topics and they could grow/change then keeping this single processor
approach makes sense.  If you can go the route of one ConsumeKafkaRecord
per topic then obviously that would work well.

Not seeing your flow though I cannot be certain where the bottleneck(s)
exist and provide guidance.  But this is without a doubt a vital skill to
achieving maximum performance.

You'd have to show/share a ton more details for folks here to be helpful in
walking through the full design.  Or explain the end to end flow.

As an additional food for thought if the flows are indeed 'from kafka -> do
stuff -> back to kafka' this is likely a great use case for stateless-nifi.

Thanks

On Wed, Sep 23, 2020 at 10:43 AM <na...@bt.com> wrote:

> Hi Joe,
>
>
>
> Thanks for getting back to me so quickly.
>
>
>
> Our disk setup is as follows:
>
> Path
>
> Storage Type
>
> Format
>
> Capacity
>
> Content
>
> /
>
> 100GB OS SSD
>
> ext4
>
> 89.9GB
>
> OS, NiFi install, Logs
>
> /data/1/
>
> 2 x 4TB SAS Hard Drives in RAID 1
>
> ext4
>
> 3.7TB
>
> Database and Flowfile Repos
>
> /data/2/
>
> 8 x 4TB SAS Hard Drives in RAID 10
>
> ext4
>
> 14.6TB
>
> Content Repo
>
> /data/3/
>
> 2 x 4TB SAS Hard Drives in RAID 1
>
> ext4
>
> 3.7TB
>
> Provence Repo
>
> /ssd
>
> 1 x 4TB PCIe NVMe SSD
>
> ext4
>
> 3.7TB
>
> Content Repo (Used instead of /data/2/ as a test), to see if CPU was
> bottlenecked by Disk operations.
>
>
>
> I will certainly take a look at those. One question with the consume
> record processor is how I would consume from multiple topics and ensure the
> correct Avro schema is used to deserialise the message? We have 1:1 mapping
> of schemas to topics. At the moment the ConsumeKafka processor is reading
> from all topics in one consumer. I’m assuming the attribute kafka.topic
> attribute doesn’t exist at this stage? We use the Avro Schema Registry
> Controller as we don’t have a schema registry in place yet.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Joe Witt [mailto:joe.witt@gmail.com]
> *Sent:* 23 September 2020 17:33
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Nathan
>
>
>
> You have plenty powerful machines to hit super high speeds but what I
> cannot tell is how the disks are setup/capability and layout wise and
> relative to our three repos of importance.  You'll need to share those
> details.
>
>
>
> That said, the design of the flow matters.  The Kafka processors that
> aren't record oriented will perform poorly unless they're acquiring data in
> their natural batches as they arrive from kafka.  In short, use the record
> oriented processors from Kafka.  In it you can even deal with the fact you
> want to go from AVRO to Json and so on.  These processors have a tougher
> learning curve but they perform extremely well and we have powerful
> processors to go along with them for common patterns.
>
>
>
> You absolutely should be able to get to the big numbers you have seen.  It
> requires great flow design (powerful machines are secondary).
>
>
>
> Thanks
>
>
>
> On Wed, Sep 23, 2020 at 9:26 AM <na...@bt.com> wrote:
>
> Hi All,
>
>
>
> We’ve got a NiFi 3 Node Cluster running on 3 x 40 CPU, 256GB RAM (32G Java
> Heap) servers. However, we have only been able to achieve a consumption of
> ~9.48GB Consumption Compressed (38.53GB Uncompressed) over 5 minutes, with
> a production rate of ~16.84GB out of the cluster over  5 mins. This is much
> lower than we were expecting based on what we have read. With this
> throughput we see a CPU load ~32 on all nodes, so we know there isn’t much
> else we can get out of the CPU).
>
>
>
> We have also tried SSDs, Raided and Unraided HDDs for the content repo
> storage, but they haven’t made a difference to the amount we can process.
>
>
>
> The process is as follows:
>
> 1.       Our flow reads from Kafka Compressed (Maximum of 2000 records
> per file). It then converts them from Avro to JSON. (ConsumeKafka_0_10 à
> UpdateAttribute à ConvertRecord)
>
> 2.       Depending on which topic the flow file is consumed from, we then
> send the message to one of 10 potential process groups, each containing
> between 3 and 5 processors within the process groups. (RouteOnAttribute à
> Relevant Processing Group containing JoltTransformJSON and several custom
> processors we have made).
>
> 3.       Finally, we produce the flow file content back to one of several
> Kafka topics, based on the input topic name in Avro format with Snappy
> compression on the Kafka topic.
>
>
>
> Inspecting the queued message counts, it indicates that the Jolt
> Transforms are taking the time to process (Large queues before JOLT
> processors, small or no queues afterwards). But I’m not sure why this is
> any worse than the rest of the processors as the event duration is less
> than a second when inspecting in provenance? We have tuned the number of
> concurrent tasks, duration and schedules to get the performance we have so
> far.
>
>
>
> I’m not sure if there is anything anyone could recommend or suggest to try
> and make improvements? We need to achieve a rate around 5x of what it’s
> currently processing with the same number of nodes. We are running out of
> ideas on how to accomplish this and may have to consider alternatives.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>

RE: NiFi V1.9.2 Performance

Posted by na...@bt.com.
Hi Joe,

Thanks for getting back to me so quickly.

Our disk setup is as follows:
Path
Storage Type
Format
Capacity
Content
/
100GB OS SSD
ext4
89.9GB
OS, NiFi install, Logs
/data/1/
2 x 4TB SAS Hard Drives in RAID 1
ext4
3.7TB
Database and Flowfile Repos
/data/2/
8 x 4TB SAS Hard Drives in RAID 10
ext4
14.6TB
Content Repo
/data/3/
2 x 4TB SAS Hard Drives in RAID 1
ext4
3.7TB
Provence Repo
/ssd
1 x 4TB PCIe NVMe SSD
ext4
3.7TB
Content Repo (Used instead of /data/2/ as a test), to see if CPU was bottlenecked by Disk operations.

I will certainly take a look at those. One question with the consume record processor is how I would consume from multiple topics and ensure the correct Avro schema is used to deserialise the message? We have 1:1 mapping of schemas to topics. At the moment the ConsumeKafka processor is reading from all topics in one consumer. I’m assuming the attribute kafka.topic attribute doesn’t exist at this stage? We use the Avro Schema Registry Controller as we don’t have a schema registry in place yet.

Kind Regards,

Nathan

From: Joe Witt [mailto:joe.witt@gmail.com]
Sent: 23 September 2020 17:33
To: users@nifi.apache.org
Subject: Re: NiFi V1.9.2 Performance

Nathan

You have plenty powerful machines to hit super high speeds but what I cannot tell is how the disks are setup/capability and layout wise and relative to our three repos of importance.  You'll need to share those details.

That said, the design of the flow matters.  The Kafka processors that aren't record oriented will perform poorly unless they're acquiring data in their natural batches as they arrive from kafka.  In short, use the record oriented processors from Kafka.  In it you can even deal with the fact you want to go from AVRO to Json and so on.  These processors have a tougher learning curve but they perform extremely well and we have powerful processors to go along with them for common patterns.

You absolutely should be able to get to the big numbers you have seen.  It requires great flow design (powerful machines are secondary).

Thanks

On Wed, Sep 23, 2020 at 9:26 AM <na...@bt.com>> wrote:
Hi All,

We’ve got a NiFi 3 Node Cluster running on 3 x 40 CPU, 256GB RAM (32G Java Heap) servers. However, we have only been able to achieve a consumption of ~9.48GB Consumption Compressed (38.53GB Uncompressed) over 5 minutes, with a production rate of ~16.84GB out of the cluster over  5 mins. This is much lower than we were expecting based on what we have read. With this throughput we see a CPU load ~32 on all nodes, so we know there isn’t much else we can get out of the CPU).

We have also tried SSDs, Raided and Unraided HDDs for the content repo storage, but they haven’t made a difference to the amount we can process.

The process is as follows:

1.       Our flow reads from Kafka Compressed (Maximum of 2000 records per file). It then converts them from Avro to JSON. (ConsumeKafka_0_10 --> UpdateAttribute --> ConvertRecord)

2.       Depending on which topic the flow file is consumed from, we then send the message to one of 10 potential process groups, each containing between 3 and 5 processors within the process groups. (RouteOnAttribute --> Relevant Processing Group containing JoltTransformJSON and several custom processors we have made).

3.       Finally, we produce the flow file content back to one of several Kafka topics, based on the input topic name in Avro format with Snappy compression on the Kafka topic.

Inspecting the queued message counts, it indicates that the Jolt Transforms are taking the time to process (Large queues before JOLT processors, small or no queues afterwards). But I’m not sure why this is any worse than the rest of the processors as the event duration is less than a second when inspecting in provenance? We have tuned the number of concurrent tasks, duration and schedules to get the performance we have so far.

I’m not sure if there is anything anyone could recommend or suggest to try and make improvements? We need to achieve a rate around 5x of what it’s currently processing with the same number of nodes. We are running out of ideas on how to accomplish this and may have to consider alternatives.

Kind Regards,

Nathan

Re: NiFi V1.9.2 Performance

Posted by Joe Witt <jo...@gmail.com>.
Nathan

You have plenty powerful machines to hit super high speeds but what I
cannot tell is how the disks are setup/capability and layout wise and
relative to our three repos of importance.  You'll need to share those
details.

That said, the design of the flow matters.  The Kafka processors that
aren't record oriented will perform poorly unless they're acquiring data in
their natural batches as they arrive from kafka.  In short, use the record
oriented processors from Kafka.  In it you can even deal with the fact you
want to go from AVRO to Json and so on.  These processors have a tougher
learning curve but they perform extremely well and we have powerful
processors to go along with them for common patterns.

You absolutely should be able to get to the big numbers you have seen.  It
requires great flow design (powerful machines are secondary).

Thanks

On Wed, Sep 23, 2020 at 9:26 AM <na...@bt.com> wrote:

> Hi All,
>
>
>
> We’ve got a NiFi 3 Node Cluster running on 3 x 40 CPU, 256GB RAM (32G Java
> Heap) servers. However, we have only been able to achieve a consumption of
> ~9.48GB Consumption Compressed (38.53GB Uncompressed) over 5 minutes, with
> a production rate of ~16.84GB out of the cluster over  5 mins. This is much
> lower than we were expecting based on what we have read. With this
> throughput we see a CPU load ~32 on all nodes, so we know there isn’t much
> else we can get out of the CPU).
>
>
>
> We have also tried SSDs, Raided and Unraided HDDs for the content repo
> storage, but they haven’t made a difference to the amount we can process.
>
>
>
> The process is as follows:
>
> 1.       Our flow reads from Kafka Compressed (Maximum of 2000 records
> per file). It then converts them from Avro to JSON. (ConsumeKafka_0_10 à
> UpdateAttribute à ConvertRecord)
>
> 2.       Depending on which topic the flow file is consumed from, we then
> send the message to one of 10 potential process groups, each containing
> between 3 and 5 processors within the process groups. (RouteOnAttribute à
> Relevant Processing Group containing JoltTransformJSON and several custom
> processors we have made).
>
> 3.       Finally, we produce the flow file content back to one of several
> Kafka topics, based on the input topic name in Avro format with Snappy
> compression on the Kafka topic.
>
>
>
> Inspecting the queued message counts, it indicates that the Jolt
> Transforms are taking the time to process (Large queues before JOLT
> processors, small or no queues afterwards). But I’m not sure why this is
> any worse than the rest of the processors as the event duration is less
> than a second when inspecting in provenance? We have tuned the number of
> concurrent tasks, duration and schedules to get the performance we have so
> far.
>
>
>
> I’m not sure if there is anything anyone could recommend or suggest to try
> and make improvements? We need to achieve a rate around 5x of what it’s
> currently processing with the same number of nodes. We are running out of
> ideas on how to accomplish this and may have to consider alternatives.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>