You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by "Jens M. Kofoed" <jm...@gmail.com> on 2022/08/30 06:31:52 UTC

Need help to merge all records in cluster into one flowfile

Hi all

I'm running a 3 node cluster at version 1.16.2. I'm using
the SiteToSiteStatusReportingTask to monitor and check for any
backpressures or queues. I'm trying to merge all 3 reports into 1, but must
of the times I always get 2 flowfile after my MergeRecord.

To be sure the nodes are creating the reports at the same time the
SiteToSiteStatusReportingTask is set to schedule via CRON driver every 5
mins.
The connection from the input port to the next process is set with "Load
Balance Strategy" to Single node, to be sure all 3 reports are at the same
node.
In my MergeRecord the "Correlation Attribute Name" is set to
"reporting.task.uuid" which is the same for all 3 flowfiles.
"Minimum Number of Records" is set to 5000, which is much higher than the
total amounts of records.
"Minimum Bin Size" is set to 5 MB, which is also much higher than the total
size. Maximum "Number of Bins" is at default: 10
"Max Bin Age" is set to 10 s.

With these setting I was hoping that all 3 reports, should be at the same
node within a few seconds. And that the mergeRecods will merge all 3
flowfiles into 1. But many time the mergeRecord outputs 2 flowfiles.

Any ideas how to force all into one flowfile.

Kind regards
Jens M. Kofoed

Re: Need help to merge all records in cluster into one flowfile

Posted by "Jens M. Kofoed" <jm...@gmail.com>.
Hi Chris and Mark

Many thanks for your reply. You are totally right (of course :-) ) and that
is also the knowledge and understanding I had (have). except that the EL
only will be evaluated using variable registry. Sorry for that :-)

My goal is to only have one flow file, with all records from the
SiteToSiteStatusReportingTask. In my following flow, I'm checking for
backpressures and queues and create triggers to other systems. If 2 or more
flow files are processed after each other, and  backpressures issue in the
first flow file will be overwritten with an OK from the next flow file.
That's why I will merge them.

To debug what is going on, I create the report every minutes by cron (* /1
* * *) and most of the time (more than 90%) all 3 flow files are merged
into 1 flow file because the Minimum Number of Records is reached AND
Minimum Bin Size is reached. And the 3 flow files are merged within
milliseconds. So the processor does exactly what it should do.
The issues is that some times is doesn't merged and times out and hitting
the "Max Bin Age". The 3 flow files have exactly the same amount of records
and the size of course very depending of the amount of data in queues
across the hole flow.
With the Minimum Bin Size set to 0B it should only be the "Minimum Number
of Records" which came in action. And as I wrote above it works great, must
of the time.
If I change the SiteToSiteStatusReportingTask to create reports with a
lower batch size, each node will create more flow files. And with 9 flow
files at the input port, which together have exactly the same amount of
records and size as 3 flow files. It will never bin all together in one
file. If I change the Max Bin age, it just take longer for the process to
"time out".

So my issues are more why 3 flow files will merge 90+% of the time and not
all the time, since the amount of records are the same? That is
what worries me.

But thanks Mark, I will give the MergeContent a try and fix the json array
so it doesn't break afterwards in the following flow.

But many thanks again to both of you.

kind regards
Jens M. Kofoed



Den ons. 31. aug. 2022 kl. 16.07 skrev Mark Payne <ma...@hotmail.com>:

> Thanks Chris. That’s exactly right.
>
> Given that you’re seeing the Max Bin Age is the cause, the solution would
> be to increase the max bin age if you want fewer FlowFiles.
>
> The data is merged when any one of the following conditions is met:
>
> - Minimum Number of Records is reached AND Minimum Bin Size is reached
> OR
> - Maximum Number of Records is reached OR Maximum Bin Size is reached
> OR
> - Max Bin Age is reached
> OR
> - Maximum Number of Bins is reached AND a new FlowFile is encountered that
> belongs in a different bin than any of the existing ones (only valid if
> using a Correlation Attribute).
>
> So in your case, you’re not hitting the minimum number of records, but you
> are hitting the Max Bin Age so it’s merging.
> The idea behind Max Bin Age is that it’s basically a timeout. It prevents
> data from stacking up for too long, introducing too large of a latency.
>
> Now, that said, what you’re after is really not something that’s as easily
> supported by this Processor. Becuase you’re not really looking to pack
> together Records in order to build a larger bundle. You’re looking to pack
> together records in order to re-join specific sets of Records. So you might
> actually want to consider using MergeContent instead of MergeRecord.
> Assuming that your data is in JSON format, you can use MergeRecord’s
> header/footer/demarcator properties to ensure that you still have valid
> JSON. But with MergeRecord you specify min/max based on number of
> FlowFiles, not number of Records. So you can set Minimum Entries to 3
> (assuming you have 3 nodes in your cluster). So that’ll wait for 3
> FlowFiles. Presumably one from each node.And set a Max Bin Age short enough
> that even if a node doesn’t send because the node is stopped, you still
> merge data from the other 2 nodes or whatever.
>
> Thanks
> -Mark
>
>
>
> On Aug 31, 2022, at 7:45 AM, Chris Sampson <ch...@naimuri.com>
> wrote:
>
> For “Minimum Number of Records”, the docs [1] indicate that the field does
> support Expression Language but "will be evaluated using variable
> registry only”, i.e. it doesn’t use FlowFile attributes, which it appears
> you’re trying to do in your example within this email chain.
>
> If you provenance is showing that "Records Merged due to: Bin has reached
> Max Bin Age”, wouldn’t it be a good idea to increase the “Max Bin Age” from
> the “10s” you indicate in your original email? If you set this to, say,
> “5mins”, do you see the number of resultant FlowFiles reduce with more
> input Records included within each output FlowFile?
>
> Basically, your provenance seems to suggest that you need to allow a
> longer period of time for your data to reach the MergeRecord processor and
> be combined. My understanding from a quick look at the processor’s
> “Additional Details” [2] (see section “When a Bin is Merged”) is that the
> Bin will be merged & output once the “Max Bin Age” (if configured) is
> reached irregardless of whether the “Minimum Number of Records” has been
> reached. Likewise, I’d expect that the merged output would happen if
> “Maximum Number of Records” is reached irrespective of any “Max Bin Age”
> settings.
>
>
> Caveat: I don’t really use MergeRecord
>
> [1]:
> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.17.0/org.apache.nifi.processors.standard.MergeRecord/index.html
> [2]:
> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.17.0/org.apache.nifi.processors.standard.MergeRecord/additionalDetails.html
>
>
> On 31 Aug 2022, at 09:22, Jens M. Kofoed <jm...@gmail.com> wrote:
>
> Hi
> By degreasing the batch size for the SiteToSiteStatusReportingTask I get
> even more flowfiles. So just for testing I now have total of 9 files
> (2.75MB) in the incomming queue to the mergeRecord.
> The total number of records above 2000, so I have set the "Minimum Number
> of Records" to 1500 and the "Minimum Bin Size" to 2 MB.
> The result are 3 flowfiles which are all have "Records Merged due to: Bin
> has reached Max Bin Age"???? Why?
> All 9 files should be merged into one file, since the total amount of
> records exceeds the minimum,
>
> Kind regards
> Jens M. Kofoed
>
> Den ons. 31. aug. 2022 kl. 09.50 skrev Jens M. Kofoed <
> jmkofoed.ube@gmail.com>:
>
>> Hey Mark
>>
>> I tried another idea to dynamically set the "Minimum Number of Records"
>> by EL. Editing the field it says that EL is supported, so I tried this:
>> ${record.count:minus(1):multiply(3)}
>>
>> But the processor does not like this:
>> Perform Validation
>> nifi.mydomain.com:8443 - Component is invalid: 'Component' is invalid
>> because Failed to perform validation due to
>> java.lang.NumberFormatException: For input string: ""
>>
>> I got the same error if I just tried to set the EL to: ${record.count}
>>
>> Is this a bug???
>>
>> Kind regards
>> Jens
>>
>>
>> Den ons. 31. aug. 2022 kl. 09.24 skrev Jens M. Kofoed <
>> jmkofoed.ube@gmail.com>:
>>
>>> Hey Mark
>>>
>>> Many thanks for your reply. But it's in fact the Details field which
>>> does not help me.
>>> At 08:16:00 all 3 nodes generate a SiteToSiteStatusReport.
>>> At 08:16:11.003 the MergeRecords have a JOIN event. Joining 2 files:
>>> "Records Merged due to: Bin has reached Max Bin Age"
>>> At 08:16:11.008 the MergeRecords have another JOIN event. Joining 1
>>> file: "Records Merged due to: Bin has reached Max Bin Age"
>>>
>>> So one file is 0.005s younger than the other 2 files and therefore is
>>> not merged into the first bin of files. But how can we force all flowfiles
>>> to be merged into one flowfile?
>>> If I set the minimum file size or records to be within range of the >2
>>> files and <3 files, it will trigger a merge. But when we create more flows
>>> the records and filesize will increase and we will be back to the problem
>>> that not all files will be merged into one.
>>>
>>> kind regards
>>> Jens
>>>
>>> Den tir. 30. aug. 2022 kl. 15.40 skrev Mark Payne <markap14@hotmail.com
>>> >:
>>>
>>>> Hey Jens,
>>>>
>>>> My recommendation is to take a look at the data provenance for
>>>> MergeRecord (i.e., right-click on the Processor and go to Data Provenance.)
>>>> Click the little ‘i’ icon on the left for one of the JOIN events.
>>>> There, it will show a “Details” field, which will tell you why it
>>>> merged the data in the bin.
>>>> Once you understand why it’s merging the data with only 2 FlowFiles,
>>>> you should be to understand how to adjust your configuration to avoid doing
>>>> that.
>>>>
>>>> Thanks
>>>> -Mark
>>>>
>>>>
>>>> > On Aug 30, 2022, at 2:31 AM, Jens M. Kofoed <
>>>> jmkofoed.ube+NIFI@gmail.com> wrote:
>>>> >
>>>> > Hi all
>>>> >
>>>> > I'm running a 3 node cluster at version 1.16.2. I'm using the
>>>> SiteToSiteStatusReportingTask to monitor and check for any backpressures or
>>>> queues. I'm trying to merge all 3 reports into 1, but must of the times I
>>>> always get 2 flowfile after my MergeRecord.
>>>> >
>>>> > To be sure the nodes are creating the reports at the same time the
>>>> SiteToSiteStatusReportingTask is set to schedule via CRON driver every 5
>>>> mins.
>>>> > The connection from the input port to the next process is set with
>>>> "Load Balance Strategy" to Single node, to be sure all 3 reports are at the
>>>> same node.
>>>> > In my MergeRecord the "Correlation Attribute Name" is set to
>>>> "reporting.task.uuid" which is the same for all 3 flowfiles.
>>>> > "Minimum Number of Records" is set to 5000, which is much higher than
>>>> the total amounts of records.
>>>> > "Minimum Bin Size" is set to 5 MB, which is also much higher than the
>>>> total size. Maximum "Number of Bins" is at default: 10
>>>> > "Max Bin Age" is set to 10 s.
>>>> >
>>>> > With these setting I was hoping that all 3 reports, should be at the
>>>> same node within a few seconds. And that the mergeRecods will merge all 3
>>>> flowfiles into 1. But many time the mergeRecord outputs 2 flowfiles.
>>>> >
>>>> > Any ideas how to force all into one flowfile.
>>>> >
>>>> > Kind regards
>>>> > Jens M. Kofoed
>>>>
>>>>
>
>

Re: Need help to merge all records in cluster into one flowfile

Posted by Mark Payne <ma...@hotmail.com>.
Thanks Chris. That’s exactly right.

Given that you’re seeing the Max Bin Age is the cause, the solution would be to increase the max bin age if you want fewer FlowFiles.

The data is merged when any one of the following conditions is met:

- Minimum Number of Records is reached AND Minimum Bin Size is reached
OR
- Maximum Number of Records is reached OR Maximum Bin Size is reached
OR
- Max Bin Age is reached
OR
- Maximum Number of Bins is reached AND a new FlowFile is encountered that belongs in a different bin than any of the existing ones (only valid if using a Correlation Attribute).

So in your case, you’re not hitting the minimum number of records, but you are hitting the Max Bin Age so it’s merging.
The idea behind Max Bin Age is that it’s basically a timeout. It prevents data from stacking up for too long, introducing too large of a latency.

Now, that said, what you’re after is really not something that’s as easily supported by this Processor. Becuase you’re not really looking to pack together Records in order to build a larger bundle. You’re looking to pack together records in order to re-join specific sets of Records. So you might actually want to consider using MergeContent instead of MergeRecord. Assuming that your data is in JSON format, you can use MergeRecord’s header/footer/demarcator properties to ensure that you still have valid JSON. But with MergeRecord you specify min/max based on number of FlowFiles, not number of Records. So you can set Minimum Entries to 3 (assuming you have 3 nodes in your cluster). So that’ll wait for 3 FlowFiles. Presumably one from each node.And set a Max Bin Age short enough that even if a node doesn’t send because the node is stopped, you still merge data from the other 2 nodes or whatever.

Thanks
-Mark



On Aug 31, 2022, at 7:45 AM, Chris Sampson <ch...@naimuri.com>> wrote:

For “Minimum Number of Records”, the docs [1] indicate that the field does support Expression Language but "will be evaluated using variable registry only”, i.e. it doesn’t use FlowFile attributes, which it appears you’re trying to do in your example within this email chain.

If you provenance is showing that "Records Merged due to: Bin has reached Max Bin Age”, wouldn’t it be a good idea to increase the “Max Bin Age” from the “10s” you indicate in your original email? If you set this to, say, “5mins”, do you see the number of resultant FlowFiles reduce with more input Records included within each output FlowFile?

Basically, your provenance seems to suggest that you need to allow a longer period of time for your data to reach the MergeRecord processor and be combined. My understanding from a quick look at the processor’s “Additional Details” [2] (see section “When a Bin is Merged”) is that the Bin will be merged & output once the “Max Bin Age” (if configured) is reached irregardless of whether the “Minimum Number of Records” has been reached. Likewise, I’d expect that the merged output would happen if “Maximum Number of Records” is reached irrespective of any “Max Bin Age” settings.


Caveat: I don’t really use MergeRecord

[1]: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.17.0/org.apache.nifi.processors.standard.MergeRecord/index.html
[2]: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.17.0/org.apache.nifi.processors.standard.MergeRecord/additionalDetails.html


On 31 Aug 2022, at 09:22, Jens M. Kofoed <jm...@gmail.com>> wrote:

Hi
By degreasing the batch size for the SiteToSiteStatusReportingTask I get even more flowfiles. So just for testing I now have total of 9 files (2.75MB) in the incomming queue to the mergeRecord.
The total number of records above 2000, so I have set the "Minimum Number of Records" to 1500 and the "Minimum Bin Size" to 2 MB.
The result are 3 flowfiles which are all have "Records Merged due to: Bin has reached Max Bin Age"???? Why?
All 9 files should be merged into one file, since the total amount of records exceeds the minimum,

Kind regards
Jens M. Kofoed

Den ons. 31. aug. 2022 kl. 09.50 skrev Jens M. Kofoed <jm...@gmail.com>>:
Hey Mark

I tried another idea to dynamically set the "Minimum Number of Records" by EL. Editing the field it says that EL is supported, so I tried this:
${record.count:minus(1):multiply(3)}

But the processor does not like this:
Perform Validation
nifi.mydomain.com:8443<http://nifi.mydomain.com:8443/> - Component is invalid: 'Component' is invalid because Failed to perform validation due to java.lang.NumberFormatException: For input string: ""

I got the same error if I just tried to set the EL to: ${record.count}

Is this a bug???

Kind regards
Jens


Den ons. 31. aug. 2022 kl. 09.24 skrev Jens M. Kofoed <jm...@gmail.com>>:
Hey Mark

Many thanks for your reply. But it's in fact the Details field which does not help me.
At 08:16:00 all 3 nodes generate a SiteToSiteStatusReport.
At 08:16:11.003 the MergeRecords have a JOIN event. Joining 2 files: "Records Merged due to: Bin has reached Max Bin Age"
At 08:16:11.008 the MergeRecords have another JOIN event. Joining 1 file: "Records Merged due to: Bin has reached Max Bin Age"

So one file is 0.005s younger than the other 2 files and therefore is not merged into the first bin of files. But how can we force all flowfiles to be merged into one flowfile?
If I set the minimum file size or records to be within range of the >2 files and <3 files, it will trigger a merge. But when we create more flows the records and filesize will increase and we will be back to the problem that not all files will be merged into one.

kind regards
Jens

Den tir. 30. aug. 2022 kl. 15.40 skrev Mark Payne <ma...@hotmail.com>>:
Hey Jens,

My recommendation is to take a look at the data provenance for MergeRecord (i.e., right-click on the Processor and go to Data Provenance.) Click the little ‘i’ icon on the left for one of the JOIN events.
There, it will show a “Details” field, which will tell you why it merged the data in the bin.
Once you understand why it’s merging the data with only 2 FlowFiles, you should be to understand how to adjust your configuration to avoid doing that.

Thanks
-Mark


> On Aug 30, 2022, at 2:31 AM, Jens M. Kofoed <jm...@gmail.com>> wrote:
>
> Hi all
>
> I'm running a 3 node cluster at version 1.16.2. I'm using the SiteToSiteStatusReportingTask to monitor and check for any backpressures or queues. I'm trying to merge all 3 reports into 1, but must of the times I always get 2 flowfile after my MergeRecord.
>
> To be sure the nodes are creating the reports at the same time the SiteToSiteStatusReportingTask is set to schedule via CRON driver every 5 mins.
> The connection from the input port to the next process is set with "Load Balance Strategy" to Single node, to be sure all 3 reports are at the same node.
> In my MergeRecord the "Correlation Attribute Name" is set to "reporting.task.uuid" which is the same for all 3 flowfiles.
> "Minimum Number of Records" is set to 5000, which is much higher than the total amounts of records.
> "Minimum Bin Size" is set to 5 MB, which is also much higher than the total size. Maximum "Number of Bins" is at default: 10
> "Max Bin Age" is set to 10 s.
>
> With these setting I was hoping that all 3 reports, should be at the same node within a few seconds. And that the mergeRecods will merge all 3 flowfiles into 1. But many time the mergeRecord outputs 2 flowfiles.
>
> Any ideas how to force all into one flowfile.
>
> Kind regards
> Jens M. Kofoed




Re: Need help to merge all records in cluster into one flowfile

Posted by Chris Sampson <ch...@naimuri.com>.
For “Minimum Number of Records”, the docs [1] indicate that the field does support Expression Language but "will be evaluated using variable registry only”, i.e. it doesn’t use FlowFile attributes, which it appears you’re trying to do in your example within this email chain.

If you provenance is showing that "Records Merged due to: Bin has reached Max Bin Age”, wouldn’t it be a good idea to increase the “Max Bin Age” from the “10s” you indicate in your original email? If you set this to, say, “5mins”, do you see the number of resultant FlowFiles reduce with more input Records included within each output FlowFile?

Basically, your provenance seems to suggest that you need to allow a longer period of time for your data to reach the MergeRecord processor and be combined. My understanding from a quick look at the processor’s “Additional Details” [2] (see section “When a Bin is Merged”) is that the Bin will be merged & output once the “Max Bin Age” (if configured) is reached irregardless of whether the “Minimum Number of Records” has been reached. Likewise, I’d expect that the merged output would happen if “Maximum Number of Records” is reached irrespective of any “Max Bin Age” settings.


Caveat: I don’t really use MergeRecord

[1]: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.17.0/org.apache.nifi.processors.standard.MergeRecord/index.html
[2]: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.17.0/org.apache.nifi.processors.standard.MergeRecord/additionalDetails.html


> On 31 Aug 2022, at 09:22, Jens M. Kofoed <jm...@gmail.com> wrote:
> 
> Hi
> By degreasing the batch size for the SiteToSiteStatusReportingTask I get even more flowfiles. So just for testing I now have total of 9 files (2.75MB) in the incomming queue to the mergeRecord.
> The total number of records above 2000, so I have set the "Minimum Number of Records" to 1500 and the "Minimum Bin Size" to 2 MB.
> The result are 3 flowfiles which are all have "Records Merged due to: Bin has reached Max Bin Age"???? Why?
> All 9 files should be merged into one file, since the total amount of records exceeds the minimum,
> 
> Kind regards
> Jens M. Kofoed
> 
> Den ons. 31. aug. 2022 kl. 09.50 skrev Jens M. Kofoed <jmkofoed.ube@gmail.com <ma...@gmail.com>>:
> Hey Mark
> 
> I tried another idea to dynamically set the "Minimum Number of Records" by EL. Editing the field it says that EL is supported, so I tried this:
> ${record.count:minus(1):multiply(3)}
> 
> But the processor does not like this:
> Perform Validation
> nifi.mydomain.com:8443 <http://nifi.mydomain.com:8443/> - Component is invalid: 'Component' is invalid because Failed to perform validation due to java.lang.NumberFormatException: For input string: ""
> 
> I got the same error if I just tried to set the EL to: ${record.count}
> 
> Is this a bug???
> 
> Kind regards
> Jens
> 
> 
> Den ons. 31. aug. 2022 kl. 09.24 skrev Jens M. Kofoed <jmkofoed.ube@gmail.com <ma...@gmail.com>>:
> Hey Mark
> 
> Many thanks for your reply. But it's in fact the Details field which does not help me.
> At 08:16:00 all 3 nodes generate a SiteToSiteStatusReport.
> At 08:16:11.003 the MergeRecords have a JOIN event. Joining 2 files: "Records Merged due to: Bin has reached Max Bin Age"
> At 08:16:11.008 the MergeRecords have another JOIN event. Joining 1 file: "Records Merged due to: Bin has reached Max Bin Age"
> 
> So one file is 0.005s younger than the other 2 files and therefore is not merged into the first bin of files. But how can we force all flowfiles to be merged into one flowfile?
> If I set the minimum file size or records to be within range of the >2 files and <3 files, it will trigger a merge. But when we create more flows the records and filesize will increase and we will be back to the problem that not all files will be merged into one.
> 
> kind regards
> Jens 
> 
> Den tir. 30. aug. 2022 kl. 15.40 skrev Mark Payne <markap14@hotmail.com <ma...@hotmail.com>>:
> Hey Jens,
> 
> My recommendation is to take a look at the data provenance for MergeRecord (i.e., right-click on the Processor and go to Data Provenance.) Click the little ‘i’ icon on the left for one of the JOIN events.
> There, it will show a “Details” field, which will tell you why it merged the data in the bin.
> Once you understand why it’s merging the data with only 2 FlowFiles, you should be to understand how to adjust your configuration to avoid doing that.
> 
> Thanks
> -Mark
> 
> 
> > On Aug 30, 2022, at 2:31 AM, Jens M. Kofoed <jmkofoed.ube+NIFI@gmail.com <ma...@gmail.com>> wrote:
> > 
> > Hi all
> > 
> > I'm running a 3 node cluster at version 1.16.2. I'm using the SiteToSiteStatusReportingTask to monitor and check for any backpressures or queues. I'm trying to merge all 3 reports into 1, but must of the times I always get 2 flowfile after my MergeRecord.
> > 
> > To be sure the nodes are creating the reports at the same time the SiteToSiteStatusReportingTask is set to schedule via CRON driver every 5 mins.
> > The connection from the input port to the next process is set with "Load Balance Strategy" to Single node, to be sure all 3 reports are at the same node.
> > In my MergeRecord the "Correlation Attribute Name" is set to "reporting.task.uuid" which is the same for all 3 flowfiles.
> > "Minimum Number of Records" is set to 5000, which is much higher than the total amounts of records.
> > "Minimum Bin Size" is set to 5 MB, which is also much higher than the total size. Maximum "Number of Bins" is at default: 10
> > "Max Bin Age" is set to 10 s.
> > 
> > With these setting I was hoping that all 3 reports, should be at the same node within a few seconds. And that the mergeRecods will merge all 3 flowfiles into 1. But many time the mergeRecord outputs 2 flowfiles.
> > 
> > Any ideas how to force all into one flowfile.
> > 
> > Kind regards
> > Jens M. Kofoed
> 


Re: Need help to merge all records in cluster into one flowfile

Posted by "Jens M. Kofoed" <jm...@gmail.com>.
Hi
By degreasing the batch size for the SiteToSiteStatusReportingTask I get
even more flowfiles. So just for testing I now have total of 9 files
(2.75MB) in the incomming queue to the mergeRecord.
The total number of records above 2000, so I have set the "Minimum Number
of Records" to 1500 and the "Minimum Bin Size" to 2 MB.
The result are 3 flowfiles which are all have "Records Merged due to: Bin
has reached Max Bin Age"???? Why?
All 9 files should be merged into one file, since the total amount of
records exceeds the minimum,

Kind regards
Jens M. Kofoed

Den ons. 31. aug. 2022 kl. 09.50 skrev Jens M. Kofoed <
jmkofoed.ube@gmail.com>:

> Hey Mark
>
> I tried another idea to dynamically set the "Minimum Number of Records" by
> EL. Editing the field it says that EL is supported, so I tried this:
> ${record.count:minus(1):multiply(3)}
>
> But the processor does not like this:
> Perform Validation
> nifi.mydomain.com:8443 - Component is invalid: 'Component' is invalid
> because Failed to perform validation due to
> java.lang.NumberFormatException: For input string: ""
>
> I got the same error if I just tried to set the EL to: ${record.count}
>
> Is this a bug???
>
> Kind regards
> Jens
>
>
> Den ons. 31. aug. 2022 kl. 09.24 skrev Jens M. Kofoed <
> jmkofoed.ube@gmail.com>:
>
>> Hey Mark
>>
>> Many thanks for your reply. But it's in fact the Details field which does
>> not help me.
>> At 08:16:00 all 3 nodes generate a SiteToSiteStatusReport.
>> At 08:16:11.003 the MergeRecords have a JOIN event. Joining 2 files:
>> "Records Merged due to: Bin has reached Max Bin Age"
>> At 08:16:11.008 the MergeRecords have another JOIN event. Joining 1 file:
>> "Records Merged due to: Bin has reached Max Bin Age"
>>
>> So one file is 0.005s younger than the other 2 files and therefore is not
>> merged into the first bin of files. But how can we force all flowfiles to
>> be merged into one flowfile?
>> If I set the minimum file size or records to be within range of the >2
>> files and <3 files, it will trigger a merge. But when we create more flows
>> the records and filesize will increase and we will be back to the problem
>> that not all files will be merged into one.
>>
>> kind regards
>> Jens
>>
>> Den tir. 30. aug. 2022 kl. 15.40 skrev Mark Payne <ma...@hotmail.com>:
>>
>>> Hey Jens,
>>>
>>> My recommendation is to take a look at the data provenance for
>>> MergeRecord (i.e., right-click on the Processor and go to Data Provenance.)
>>> Click the little ‘i’ icon on the left for one of the JOIN events.
>>> There, it will show a “Details” field, which will tell you why it merged
>>> the data in the bin.
>>> Once you understand why it’s merging the data with only 2 FlowFiles, you
>>> should be to understand how to adjust your configuration to avoid doing
>>> that.
>>>
>>> Thanks
>>> -Mark
>>>
>>>
>>> > On Aug 30, 2022, at 2:31 AM, Jens M. Kofoed <
>>> jmkofoed.ube+NIFI@gmail.com> wrote:
>>> >
>>> > Hi all
>>> >
>>> > I'm running a 3 node cluster at version 1.16.2. I'm using the
>>> SiteToSiteStatusReportingTask to monitor and check for any backpressures or
>>> queues. I'm trying to merge all 3 reports into 1, but must of the times I
>>> always get 2 flowfile after my MergeRecord.
>>> >
>>> > To be sure the nodes are creating the reports at the same time the
>>> SiteToSiteStatusReportingTask is set to schedule via CRON driver every 5
>>> mins.
>>> > The connection from the input port to the next process is set with
>>> "Load Balance Strategy" to Single node, to be sure all 3 reports are at the
>>> same node.
>>> > In my MergeRecord the "Correlation Attribute Name" is set to
>>> "reporting.task.uuid" which is the same for all 3 flowfiles.
>>> > "Minimum Number of Records" is set to 5000, which is much higher than
>>> the total amounts of records.
>>> > "Minimum Bin Size" is set to 5 MB, which is also much higher than the
>>> total size. Maximum "Number of Bins" is at default: 10
>>> > "Max Bin Age" is set to 10 s.
>>> >
>>> > With these setting I was hoping that all 3 reports, should be at the
>>> same node within a few seconds. And that the mergeRecods will merge all 3
>>> flowfiles into 1. But many time the mergeRecord outputs 2 flowfiles.
>>> >
>>> > Any ideas how to force all into one flowfile.
>>> >
>>> > Kind regards
>>> > Jens M. Kofoed
>>>
>>>

Re: Need help to merge all records in cluster into one flowfile

Posted by "Jens M. Kofoed" <jm...@gmail.com>.
Hey Mark

I tried another idea to dynamically set the "Minimum Number of Records" by
EL. Editing the field it says that EL is supported, so I tried this:
${record.count:minus(1):multiply(3)}

But the processor does not like this:
Perform Validation
nifi.mydomain.com:8443 - Component is invalid: 'Component' is invalid
because Failed to perform validation due to
java.lang.NumberFormatException: For input string: ""

I got the same error if I just tried to set the EL to: ${record.count}

Is this a bug???

Kind regards
Jens


Den ons. 31. aug. 2022 kl. 09.24 skrev Jens M. Kofoed <
jmkofoed.ube@gmail.com>:

> Hey Mark
>
> Many thanks for your reply. But it's in fact the Details field which does
> not help me.
> At 08:16:00 all 3 nodes generate a SiteToSiteStatusReport.
> At 08:16:11.003 the MergeRecords have a JOIN event. Joining 2 files:
> "Records Merged due to: Bin has reached Max Bin Age"
> At 08:16:11.008 the MergeRecords have another JOIN event. Joining 1 file:
> "Records Merged due to: Bin has reached Max Bin Age"
>
> So one file is 0.005s younger than the other 2 files and therefore is not
> merged into the first bin of files. But how can we force all flowfiles to
> be merged into one flowfile?
> If I set the minimum file size or records to be within range of the >2
> files and <3 files, it will trigger a merge. But when we create more flows
> the records and filesize will increase and we will be back to the problem
> that not all files will be merged into one.
>
> kind regards
> Jens
>
> Den tir. 30. aug. 2022 kl. 15.40 skrev Mark Payne <ma...@hotmail.com>:
>
>> Hey Jens,
>>
>> My recommendation is to take a look at the data provenance for
>> MergeRecord (i.e., right-click on the Processor and go to Data Provenance.)
>> Click the little ‘i’ icon on the left for one of the JOIN events.
>> There, it will show a “Details” field, which will tell you why it merged
>> the data in the bin.
>> Once you understand why it’s merging the data with only 2 FlowFiles, you
>> should be to understand how to adjust your configuration to avoid doing
>> that.
>>
>> Thanks
>> -Mark
>>
>>
>> > On Aug 30, 2022, at 2:31 AM, Jens M. Kofoed <
>> jmkofoed.ube+NIFI@gmail.com> wrote:
>> >
>> > Hi all
>> >
>> > I'm running a 3 node cluster at version 1.16.2. I'm using the
>> SiteToSiteStatusReportingTask to monitor and check for any backpressures or
>> queues. I'm trying to merge all 3 reports into 1, but must of the times I
>> always get 2 flowfile after my MergeRecord.
>> >
>> > To be sure the nodes are creating the reports at the same time the
>> SiteToSiteStatusReportingTask is set to schedule via CRON driver every 5
>> mins.
>> > The connection from the input port to the next process is set with
>> "Load Balance Strategy" to Single node, to be sure all 3 reports are at the
>> same node.
>> > In my MergeRecord the "Correlation Attribute Name" is set to
>> "reporting.task.uuid" which is the same for all 3 flowfiles.
>> > "Minimum Number of Records" is set to 5000, which is much higher than
>> the total amounts of records.
>> > "Minimum Bin Size" is set to 5 MB, which is also much higher than the
>> total size. Maximum "Number of Bins" is at default: 10
>> > "Max Bin Age" is set to 10 s.
>> >
>> > With these setting I was hoping that all 3 reports, should be at the
>> same node within a few seconds. And that the mergeRecods will merge all 3
>> flowfiles into 1. But many time the mergeRecord outputs 2 flowfiles.
>> >
>> > Any ideas how to force all into one flowfile.
>> >
>> > Kind regards
>> > Jens M. Kofoed
>>
>>

Re: Need help to merge all records in cluster into one flowfile

Posted by "Jens M. Kofoed" <jm...@gmail.com>.
Hey Mark

Many thanks for your reply. But it's in fact the Details field which does
not help me.
At 08:16:00 all 3 nodes generate a SiteToSiteStatusReport.
At 08:16:11.003 the MergeRecords have a JOIN event. Joining 2 files:
"Records Merged due to: Bin has reached Max Bin Age"
At 08:16:11.008 the MergeRecords have another JOIN event. Joining 1 file:
"Records Merged due to: Bin has reached Max Bin Age"

So one file is 0.005s younger than the other 2 files and therefore is not
merged into the first bin of files. But how can we force all flowfiles to
be merged into one flowfile?
If I set the minimum file size or records to be within range of the >2
files and <3 files, it will trigger a merge. But when we create more flows
the records and filesize will increase and we will be back to the problem
that not all files will be merged into one.

kind regards
Jens

Den tir. 30. aug. 2022 kl. 15.40 skrev Mark Payne <ma...@hotmail.com>:

> Hey Jens,
>
> My recommendation is to take a look at the data provenance for MergeRecord
> (i.e., right-click on the Processor and go to Data Provenance.) Click the
> little ‘i’ icon on the left for one of the JOIN events.
> There, it will show a “Details” field, which will tell you why it merged
> the data in the bin.
> Once you understand why it’s merging the data with only 2 FlowFiles, you
> should be to understand how to adjust your configuration to avoid doing
> that.
>
> Thanks
> -Mark
>
>
> > On Aug 30, 2022, at 2:31 AM, Jens M. Kofoed <jm...@gmail.com>
> wrote:
> >
> > Hi all
> >
> > I'm running a 3 node cluster at version 1.16.2. I'm using the
> SiteToSiteStatusReportingTask to monitor and check for any backpressures or
> queues. I'm trying to merge all 3 reports into 1, but must of the times I
> always get 2 flowfile after my MergeRecord.
> >
> > To be sure the nodes are creating the reports at the same time the
> SiteToSiteStatusReportingTask is set to schedule via CRON driver every 5
> mins.
> > The connection from the input port to the next process is set with "Load
> Balance Strategy" to Single node, to be sure all 3 reports are at the same
> node.
> > In my MergeRecord the "Correlation Attribute Name" is set to
> "reporting.task.uuid" which is the same for all 3 flowfiles.
> > "Minimum Number of Records" is set to 5000, which is much higher than
> the total amounts of records.
> > "Minimum Bin Size" is set to 5 MB, which is also much higher than the
> total size. Maximum "Number of Bins" is at default: 10
> > "Max Bin Age" is set to 10 s.
> >
> > With these setting I was hoping that all 3 reports, should be at the
> same node within a few seconds. And that the mergeRecods will merge all 3
> flowfiles into 1. But many time the mergeRecord outputs 2 flowfiles.
> >
> > Any ideas how to force all into one flowfile.
> >
> > Kind regards
> > Jens M. Kofoed
>
>

Re: Need help to merge all records in cluster into one flowfile

Posted by Mark Payne <ma...@hotmail.com>.
Hey Jens,

My recommendation is to take a look at the data provenance for MergeRecord (i.e., right-click on the Processor and go to Data Provenance.) Click the little ‘i’ icon on the left for one of the JOIN events.
There, it will show a “Details” field, which will tell you why it merged the data in the bin.
Once you understand why it’s merging the data with only 2 FlowFiles, you should be to understand how to adjust your configuration to avoid doing that.

Thanks
-Mark


> On Aug 30, 2022, at 2:31 AM, Jens M. Kofoed <jm...@gmail.com> wrote:
> 
> Hi all
> 
> I'm running a 3 node cluster at version 1.16.2. I'm using the SiteToSiteStatusReportingTask to monitor and check for any backpressures or queues. I'm trying to merge all 3 reports into 1, but must of the times I always get 2 flowfile after my MergeRecord.
> 
> To be sure the nodes are creating the reports at the same time the SiteToSiteStatusReportingTask is set to schedule via CRON driver every 5 mins.
> The connection from the input port to the next process is set with "Load Balance Strategy" to Single node, to be sure all 3 reports are at the same node.
> In my MergeRecord the "Correlation Attribute Name" is set to "reporting.task.uuid" which is the same for all 3 flowfiles.
> "Minimum Number of Records" is set to 5000, which is much higher than the total amounts of records.
> "Minimum Bin Size" is set to 5 MB, which is also much higher than the total size. Maximum "Number of Bins" is at default: 10
> "Max Bin Age" is set to 10 s.
> 
> With these setting I was hoping that all 3 reports, should be at the same node within a few seconds. And that the mergeRecods will merge all 3 flowfiles into 1. But many time the mergeRecord outputs 2 flowfiles.
> 
> Any ideas how to force all into one flowfile.
> 
> Kind regards
> Jens M. Kofoed