You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Tim Dean <ti...@gmail.com> on 2018/08/28 16:07:41 UTC

MergeContent prematurely binning flow files?

I have a flow that sends a large number of JSON files into a MergeContent processor. The job of that processor is to combine all the incoming flow files with a particular flow file attribute into a single flow file, creating a JSON array containing each of the input flow files’ JSON.

I have configured the MergeContent as processor as follows:
Merge Strategy: Bin-Packing Algorithm
Merge Format: Binary Concatenation
Correlation Attribute Name: ${myFlowfileAttributeName}
Minimum number of entries: 1
Maximum number of entries: 5000
Minimum group size: 0 B
Maximum group size: <no value set>
Max bin age: 30 min
Maximum number of bins: 10
Delimiter strategy: Text
Header: [
Footer: ]
Demarcator: ,

When I run data through this flow, I am seeing a large number of small-ish merged flow files being sent to the merged relationship, I was expecting ALL of the files for a given flow file attribute value to be binned together, but they are not coming through that way. To give a example, I pushed through data containing 262 input JSON files. Of these 262, 2 of them have a flow file attribute value of ‘A’, 2 of them have a flow file attribute value of ‘B’, and 258 have a flow file attribute of ‘C’. I was expecting the merged relationship to deliver 3 flow files, one each for value A, B, and C. But.I am seeing 24 flow files on the merged relationship, 1 for a value of A, 1 for a value of B, and 22 of varying sizes with the value of C.

Can someone help me understand what other criteria MergeContent might be using to determine when to send along its merged flow files?

Thanks

Re: MergeContent prematurely binning flow files?

Posted by Tim Dean <ti...@gmail.com>.
Thanks for the suggestion Juan.

I tried to make that change and it didn’t seem to make a significant difference. The breakdown into bins was a little different - different numbers of flow files were merged together - but the end result was the same

-Tim

> On Aug 28, 2018, at 11:11 AM, Juan Sequeiros <he...@gmail.com> wrote:
> 
> Hi,
> 
> For:
> 
> Correlation Attribute Name: ${myFlowfileAttributeName}
> Should be set to: myFlowfileAttributeName
> 
> NOT ${myFlowfileAttibuteName)
> 
> Hope that helps.
> 
> On Tue, Aug 28, 2018 at 12:07 PM Tim Dean <tim.dean@gmail.com <ma...@gmail.com>> wrote:
> I have a flow that sends a large number of JSON files into a MergeContent processor. The job of that processor is to combine all the incoming flow files with a particular flow file attribute into a single flow file, creating a JSON array containing each of the input flow files’ JSON.
> 
> I have configured the MergeContent as processor as follows:
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Correlation Attribute Name: ${myFlowfileAttributeName}
> Minimum number of entries: 1
> Maximum number of entries: 5000
> Minimum group size: 0 B
> Maximum group size: <no value set>
> Max bin age: 30 min
> Maximum number of bins: 10
> Delimiter strategy: Text
> Header: [
> Footer: ]
> Demarcator: ,
> 
> When I run data through this flow, I am seeing a large number of small-ish merged flow files being sent to the merged relationship, I was expecting ALL of the files for a given flow file attribute value to be binned together, but they are not coming through that way. To give a example, I pushed through data containing 262 input JSON files. Of these 262, 2 of them have a flow file attribute value of ‘A’, 2 of them have a flow file attribute value of ‘B’, and 258 have a flow file attribute of ‘C’. I was expecting the merged relationship to deliver 3 flow files, one each for value A, B, and C. But.I am seeing 24 flow files on the merged relationship, 1 for a value of A, 1 for a value of B, and 22 of varying sizes with the value of C.
> 
> Can someone help me understand what other criteria MergeContent might be using to determine when to send along its merged flow files?
> 
> Thanks
> 
> 
> -- 
> Juan Carlos Sequeiros


Re: MergeContent prematurely binning flow files?

Posted by Juan Sequeiros <he...@gmail.com>.
Hi,

For:


   - Correlation Attribute Name: ${myFlowfileAttributeName}

Should be set to: myFlowfileAttributeName

NOT ${myFlowfileAttibuteName)

Hope that helps.

On Tue, Aug 28, 2018 at 12:07 PM Tim Dean <ti...@gmail.com> wrote:

> I have a flow that sends a large number of JSON files into a MergeContent
> processor. The job of that processor is to combine all the incoming flow
> files with a particular flow file attribute into a single flow file,
> creating a JSON array containing each of the input flow files’ JSON.
>
> I have configured the MergeContent as processor as follows:
>
>    - Merge Strategy: Bin-Packing Algorithm
>    - Merge Format: Binary Concatenation
>    - Correlation Attribute Name: ${myFlowfileAttributeName}
>    - Minimum number of entries: 1
>    - Maximum number of entries: 5000
>    - Minimum group size: 0 B
>    - Maximum group size: <no value set>
>    - Max bin age: 30 min
>    - Maximum number of bins: 10
>    - Delimiter strategy: Text
>    - Header: [
>    - Footer: ]
>    - Demarcator: ,
>
>
> When I run data through this flow, I am seeing a large number of small-ish
> merged flow files being sent to the merged relationship, I was expecting
> ALL of the files for a given flow file attribute value to be binned
> together, but they are not coming through that way. To give a example, I
> pushed through data containing 262 input JSON files. Of these 262, 2 of
> them have a flow file attribute value of ‘A’, 2 of them have a flow file
> attribute value of ‘B’, and 258 have a flow file attribute of ‘C’. I was
> expecting the merged relationship to deliver 3 flow files, one each for
> value A, B, and C. But.I am seeing 24 flow files on the merged
> relationship, 1 for a value of A, 1 for a value of B, and 22 of varying
> sizes with the value of C.
>
> Can someone help me understand what other criteria MergeContent might be
> using to determine when to send along its merged flow files?
>
> Thanks
>


-- 
Juan Carlos Sequeiros

Re: MergeContent prematurely binning flow files?

Posted by Tim Dean <ti...@gmail.com>.
Thanks Joe - this seems to be working for me.

- Tim

Sent from my iPhone

> On Aug 28, 2018, at 1:58 PM, Joe Witt <jo...@gmail.com> wrote:
> 
> Tim,
> 
> I dont recall what edge/bounds checking we do on that.  But your logic
> seems good.
> 
> Rgr that on the MergeRecord comment.
> 
> Thanks
>> On Tue, Aug 28, 2018 at 2:51 PM Tim Dean <ti...@gmail.com> wrote:
>> 
>> Thanks Joe - I think I can make that work.
>> 
>> Is there any reason why I should avoid setting the minimum and maximum values (for both number of entires and group size) to the same values? That way I think it would always be the timer that triggers the new flow file, unless I get more input files (or a larger total amount of content size) than I am configured to allow.
>> 
>> FYI - I would convert to using MergeRecord but I don’t have a schema I can refer to this for data. It seems like MergeContent is the only way to handle more of a “free form” JSON structure like my input data has.
>> 
>> -Tim
>> 
>>> On Aug 28, 2018, at 12:37 PM, Joe Witt <jo...@gmail.com> wrote:
>>> 
>>> Tim
>>> 
>>> Yeah so I think you want to set it like the following roughly
>>> 
>>> Merge Strategy: Bin-Packing Algorithm
>>> Merge Format: Binary Concatenation
>>> Correlation Attribute Name: myFlowfileAttributeName
>>> Minimum number of entries: 2000
>>> Maximum number of entries: 5000
>>> Minimum group size: 1 MB
>>> Maximum group size: 10 MB
>>> Max bin age: 5 min
>>> Maximum number of bins: 50
>>> Delimiter strategy: Text
>>> Header: [
>>> Footer: ]
>>> Demarcator: ,
>>> 
>>> With this configuration you should end up with all the items together
>>> that have the same correlation attribute value in a given 5 minute
>>> window.  Once an object enters the bucket for a given value the 5
>>> minute timer starts.  Either the minimum number of objects or size is
>>> reached and it gets out right away or the minimums are not reached and
>>> it will get kicked out based on the 5 min timer.
>>> 
>>> Lastly, consider switching to using MergeRecord and having JSON
>>> readers/writers in it.  It will take care of the framing you're trying
>>> to do with these demarcators.
>>> 
>>> Thanks
>>> Joe
>>>> On Tue, Aug 28, 2018 at 1:08 PM Tim Dean <ti...@gmail.com> wrote:
>>>> 
>>>> Thanks Joe - Your explanation makes sense.
>>>> 
>>>> I’m now concerned that MergeContent won’t do what I want it to do. In my use case what I really want is to gather ALL the files that come in with a matching attribute value. There could be just one of them, or there could be a couple thousand of them. On average there will be dozens or low hundreds. Flowfiles with matching attribute values will tend to come in around the same time as each other, with some variation due to network and other issues. So what I really want is something like:
>>>> 
>>>> When I see a new value in the flow file attribute, begin a new bin
>>>> Allow that bucket to receive as many incoming flow files as it needs to (subject to a maximum as needed to constrain memory usage)
>>>> When no new flow files with a matching attribute value have come in for a configurable duration (e.g. 5 minutes), merge all of the bin’s contents together and move it on to the next processor.
>>>> 
>>>> 
>>>> Is there a better way to do this in NiFi?
>>>> 
>>>> -Tim
>>>> 
>>>> On Aug 28, 2018, at 11:15 AM, Joe Witt <jo...@gmail.com> wrote:
>>>> 
>>>> Tim,
>>>> 
>>>> This processor is powerful and its configurations very specific.
>>>> 
>>>> That is a fancy way of saying this beast is complicated.
>>>> 
>>>> First, can you highlight which version of NiFi you're using?
>>>> 
>>>> Lets look at your settings that would cause a group of items to get
>>>> kicked out as a merge result:
>>>> 
>>>> 'minimum number of entries' - you have it at 1.  This means once a
>>>> given bucket contains at least one thing it is eligable/good enough to
>>>> go.  Now, on a given merge session it will put more than 1 in there
>>>> but that will based on how many it has pulled at once.  But, still,
>>>> you want more than 1 it sounds like.
>>>> 
>>>> 'minimum group size' - you have it at 0.  By the same logic above this
>>>> is likely much smaller than you intended.
>>>> 
>>>> Correlation attribute name: As Juan pointed out this should not be an
>>>> expression language statement if you're trying to give the name of an
>>>> attribute unless the name of the attribute you want would be the
>>>> result of the expression language statement.  This isn't consistent
>>>> with some other cases so in hindsight we should have probably made
>>>> that work differently.
>>>> 
>>>> max number of bins:
>>>> If you have ten bins currently being built up and a new one is needed
>>>> it will kick out the oldest bin as 'good enough'.  Consider making
>>>> this larger than 10 but if you know there aren't more than 10 needed
>>>> then you're good.  You also dont want to go wild with this value
>>>> either as it can result in more memory usage than necessary.
>>>> 
>>>> Thanks
>>>> 
>>>> 
>>>> On Tue, Aug 28, 2018 at 12:07 PM Tim Dean <ti...@gmail.com> wrote:
>>>> 
>>>> 
>>>> I have a flow that sends a large number of JSON files into a MergeContent processor. The job of that processor is to combine all the incoming flow files with a particular flow file attribute into a single flow file, creating a JSON array containing each of the input flow files’ JSON.
>>>> 
>>>> I have configured the MergeContent as processor as follows:
>>>> 
>>>> Merge Strategy: Bin-Packing Algorithm
>>>> Merge Format: Binary Concatenation
>>>> Correlation Attribute Name: ${myFlowfileAttributeName}
>>>> Minimum number of entries: 1
>>>> Maximum number of entries: 5000
>>>> Minimum group size: 0 B
>>>> Maximum group size: <no value set>
>>>> Max bin age: 30 min
>>>> Maximum number of bins: 10
>>>> Delimiter strategy: Text
>>>> Header: [
>>>> Footer: ]
>>>> Demarcator: ,
>>>> 
>>>> 
>>>> When I run data through this flow, I am seeing a large number of small-ish merged flow files being sent to the merged relationship, I was expecting ALL of the files for a given flow file attribute value to be binned together, but they are not coming through that way. To give a example, I pushed through data containing 262 input JSON files. Of these 262, 2 of them have a flow file attribute value of ‘A’, 2 of them have a flow file attribute value of ‘B’, and 258 have a flow file attribute of ‘C’. I was expecting the merged relationship to deliver 3 flow files, one each for value A, B, and C. But.I am seeing 24 flow files on the merged relationship, 1 for a value of A, 1 for a value of B, and 22 of varying sizes with the value of C.
>>>> 
>>>> Can someone help me understand what other criteria MergeContent might be using to determine when to send along its merged flow files?
>>>> 
>>>> Thanks
>>>> 
>>>> 
>> 

Re: MergeContent prematurely binning flow files?

Posted by Joe Witt <jo...@gmail.com>.
Tim,

I dont recall what edge/bounds checking we do on that.  But your logic
seems good.

Rgr that on the MergeRecord comment.

Thanks
On Tue, Aug 28, 2018 at 2:51 PM Tim Dean <ti...@gmail.com> wrote:
>
> Thanks Joe - I think I can make that work.
>
> Is there any reason why I should avoid setting the minimum and maximum values (for both number of entires and group size) to the same values? That way I think it would always be the timer that triggers the new flow file, unless I get more input files (or a larger total amount of content size) than I am configured to allow.
>
> FYI - I would convert to using MergeRecord but I don’t have a schema I can refer to this for data. It seems like MergeContent is the only way to handle more of a “free form” JSON structure like my input data has.
>
> -Tim
>
> > On Aug 28, 2018, at 12:37 PM, Joe Witt <jo...@gmail.com> wrote:
> >
> > Tim
> >
> > Yeah so I think you want to set it like the following roughly
> >
> > Merge Strategy: Bin-Packing Algorithm
> > Merge Format: Binary Concatenation
> > Correlation Attribute Name: myFlowfileAttributeName
> > Minimum number of entries: 2000
> > Maximum number of entries: 5000
> > Minimum group size: 1 MB
> > Maximum group size: 10 MB
> > Max bin age: 5 min
> > Maximum number of bins: 50
> > Delimiter strategy: Text
> > Header: [
> > Footer: ]
> > Demarcator: ,
> >
> > With this configuration you should end up with all the items together
> > that have the same correlation attribute value in a given 5 minute
> > window.  Once an object enters the bucket for a given value the 5
> > minute timer starts.  Either the minimum number of objects or size is
> > reached and it gets out right away or the minimums are not reached and
> > it will get kicked out based on the 5 min timer.
> >
> > Lastly, consider switching to using MergeRecord and having JSON
> > readers/writers in it.  It will take care of the framing you're trying
> > to do with these demarcators.
> >
> > Thanks
> > Joe
> > On Tue, Aug 28, 2018 at 1:08 PM Tim Dean <ti...@gmail.com> wrote:
> >>
> >> Thanks Joe - Your explanation makes sense.
> >>
> >> I’m now concerned that MergeContent won’t do what I want it to do. In my use case what I really want is to gather ALL the files that come in with a matching attribute value. There could be just one of them, or there could be a couple thousand of them. On average there will be dozens or low hundreds. Flowfiles with matching attribute values will tend to come in around the same time as each other, with some variation due to network and other issues. So what I really want is something like:
> >>
> >> When I see a new value in the flow file attribute, begin a new bin
> >> Allow that bucket to receive as many incoming flow files as it needs to (subject to a maximum as needed to constrain memory usage)
> >> When no new flow files with a matching attribute value have come in for a configurable duration (e.g. 5 minutes), merge all of the bin’s contents together and move it on to the next processor.
> >>
> >>
> >> Is there a better way to do this in NiFi?
> >>
> >> -Tim
> >>
> >> On Aug 28, 2018, at 11:15 AM, Joe Witt <jo...@gmail.com> wrote:
> >>
> >> Tim,
> >>
> >> This processor is powerful and its configurations very specific.
> >>
> >> That is a fancy way of saying this beast is complicated.
> >>
> >> First, can you highlight which version of NiFi you're using?
> >>
> >> Lets look at your settings that would cause a group of items to get
> >> kicked out as a merge result:
> >>
> >> 'minimum number of entries' - you have it at 1.  This means once a
> >> given bucket contains at least one thing it is eligable/good enough to
> >> go.  Now, on a given merge session it will put more than 1 in there
> >> but that will based on how many it has pulled at once.  But, still,
> >> you want more than 1 it sounds like.
> >>
> >> 'minimum group size' - you have it at 0.  By the same logic above this
> >> is likely much smaller than you intended.
> >>
> >> Correlation attribute name: As Juan pointed out this should not be an
> >> expression language statement if you're trying to give the name of an
> >> attribute unless the name of the attribute you want would be the
> >> result of the expression language statement.  This isn't consistent
> >> with some other cases so in hindsight we should have probably made
> >> that work differently.
> >>
> >> max number of bins:
> >> If you have ten bins currently being built up and a new one is needed
> >> it will kick out the oldest bin as 'good enough'.  Consider making
> >> this larger than 10 but if you know there aren't more than 10 needed
> >> then you're good.  You also dont want to go wild with this value
> >> either as it can result in more memory usage than necessary.
> >>
> >> Thanks
> >>
> >>
> >> On Tue, Aug 28, 2018 at 12:07 PM Tim Dean <ti...@gmail.com> wrote:
> >>
> >>
> >> I have a flow that sends a large number of JSON files into a MergeContent processor. The job of that processor is to combine all the incoming flow files with a particular flow file attribute into a single flow file, creating a JSON array containing each of the input flow files’ JSON.
> >>
> >> I have configured the MergeContent as processor as follows:
> >>
> >> Merge Strategy: Bin-Packing Algorithm
> >> Merge Format: Binary Concatenation
> >> Correlation Attribute Name: ${myFlowfileAttributeName}
> >> Minimum number of entries: 1
> >> Maximum number of entries: 5000
> >> Minimum group size: 0 B
> >> Maximum group size: <no value set>
> >> Max bin age: 30 min
> >> Maximum number of bins: 10
> >> Delimiter strategy: Text
> >> Header: [
> >> Footer: ]
> >> Demarcator: ,
> >>
> >>
> >> When I run data through this flow, I am seeing a large number of small-ish merged flow files being sent to the merged relationship, I was expecting ALL of the files for a given flow file attribute value to be binned together, but they are not coming through that way. To give a example, I pushed through data containing 262 input JSON files. Of these 262, 2 of them have a flow file attribute value of ‘A’, 2 of them have a flow file attribute value of ‘B’, and 258 have a flow file attribute of ‘C’. I was expecting the merged relationship to deliver 3 flow files, one each for value A, B, and C. But.I am seeing 24 flow files on the merged relationship, 1 for a value of A, 1 for a value of B, and 22 of varying sizes with the value of C.
> >>
> >> Can someone help me understand what other criteria MergeContent might be using to determine when to send along its merged flow files?
> >>
> >> Thanks
> >>
> >>
>

Re: MergeContent prematurely binning flow files?

Posted by Tim Dean <ti...@gmail.com>.
Thanks Joe - I think I can make that work.

Is there any reason why I should avoid setting the minimum and maximum values (for both number of entires and group size) to the same values? That way I think it would always be the timer that triggers the new flow file, unless I get more input files (or a larger total amount of content size) than I am configured to allow.

FYI - I would convert to using MergeRecord but I don’t have a schema I can refer to this for data. It seems like MergeContent is the only way to handle more of a “free form” JSON structure like my input data has.

-Tim

> On Aug 28, 2018, at 12:37 PM, Joe Witt <jo...@gmail.com> wrote:
> 
> Tim
> 
> Yeah so I think you want to set it like the following roughly
> 
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Correlation Attribute Name: myFlowfileAttributeName
> Minimum number of entries: 2000
> Maximum number of entries: 5000
> Minimum group size: 1 MB
> Maximum group size: 10 MB
> Max bin age: 5 min
> Maximum number of bins: 50
> Delimiter strategy: Text
> Header: [
> Footer: ]
> Demarcator: ,
> 
> With this configuration you should end up with all the items together
> that have the same correlation attribute value in a given 5 minute
> window.  Once an object enters the bucket for a given value the 5
> minute timer starts.  Either the minimum number of objects or size is
> reached and it gets out right away or the minimums are not reached and
> it will get kicked out based on the 5 min timer.
> 
> Lastly, consider switching to using MergeRecord and having JSON
> readers/writers in it.  It will take care of the framing you're trying
> to do with these demarcators.
> 
> Thanks
> Joe
> On Tue, Aug 28, 2018 at 1:08 PM Tim Dean <ti...@gmail.com> wrote:
>> 
>> Thanks Joe - Your explanation makes sense.
>> 
>> I’m now concerned that MergeContent won’t do what I want it to do. In my use case what I really want is to gather ALL the files that come in with a matching attribute value. There could be just one of them, or there could be a couple thousand of them. On average there will be dozens or low hundreds. Flowfiles with matching attribute values will tend to come in around the same time as each other, with some variation due to network and other issues. So what I really want is something like:
>> 
>> When I see a new value in the flow file attribute, begin a new bin
>> Allow that bucket to receive as many incoming flow files as it needs to (subject to a maximum as needed to constrain memory usage)
>> When no new flow files with a matching attribute value have come in for a configurable duration (e.g. 5 minutes), merge all of the bin’s contents together and move it on to the next processor.
>> 
>> 
>> Is there a better way to do this in NiFi?
>> 
>> -Tim
>> 
>> On Aug 28, 2018, at 11:15 AM, Joe Witt <jo...@gmail.com> wrote:
>> 
>> Tim,
>> 
>> This processor is powerful and its configurations very specific.
>> 
>> That is a fancy way of saying this beast is complicated.
>> 
>> First, can you highlight which version of NiFi you're using?
>> 
>> Lets look at your settings that would cause a group of items to get
>> kicked out as a merge result:
>> 
>> 'minimum number of entries' - you have it at 1.  This means once a
>> given bucket contains at least one thing it is eligable/good enough to
>> go.  Now, on a given merge session it will put more than 1 in there
>> but that will based on how many it has pulled at once.  But, still,
>> you want more than 1 it sounds like.
>> 
>> 'minimum group size' - you have it at 0.  By the same logic above this
>> is likely much smaller than you intended.
>> 
>> Correlation attribute name: As Juan pointed out this should not be an
>> expression language statement if you're trying to give the name of an
>> attribute unless the name of the attribute you want would be the
>> result of the expression language statement.  This isn't consistent
>> with some other cases so in hindsight we should have probably made
>> that work differently.
>> 
>> max number of bins:
>> If you have ten bins currently being built up and a new one is needed
>> it will kick out the oldest bin as 'good enough'.  Consider making
>> this larger than 10 but if you know there aren't more than 10 needed
>> then you're good.  You also dont want to go wild with this value
>> either as it can result in more memory usage than necessary.
>> 
>> Thanks
>> 
>> 
>> On Tue, Aug 28, 2018 at 12:07 PM Tim Dean <ti...@gmail.com> wrote:
>> 
>> 
>> I have a flow that sends a large number of JSON files into a MergeContent processor. The job of that processor is to combine all the incoming flow files with a particular flow file attribute into a single flow file, creating a JSON array containing each of the input flow files’ JSON.
>> 
>> I have configured the MergeContent as processor as follows:
>> 
>> Merge Strategy: Bin-Packing Algorithm
>> Merge Format: Binary Concatenation
>> Correlation Attribute Name: ${myFlowfileAttributeName}
>> Minimum number of entries: 1
>> Maximum number of entries: 5000
>> Minimum group size: 0 B
>> Maximum group size: <no value set>
>> Max bin age: 30 min
>> Maximum number of bins: 10
>> Delimiter strategy: Text
>> Header: [
>> Footer: ]
>> Demarcator: ,
>> 
>> 
>> When I run data through this flow, I am seeing a large number of small-ish merged flow files being sent to the merged relationship, I was expecting ALL of the files for a given flow file attribute value to be binned together, but they are not coming through that way. To give a example, I pushed through data containing 262 input JSON files. Of these 262, 2 of them have a flow file attribute value of ‘A’, 2 of them have a flow file attribute value of ‘B’, and 258 have a flow file attribute of ‘C’. I was expecting the merged relationship to deliver 3 flow files, one each for value A, B, and C. But.I am seeing 24 flow files on the merged relationship, 1 for a value of A, 1 for a value of B, and 22 of varying sizes with the value of C.
>> 
>> Can someone help me understand what other criteria MergeContent might be using to determine when to send along its merged flow files?
>> 
>> Thanks
>> 
>> 


Re: MergeContent prematurely binning flow files?

Posted by Joe Witt <jo...@gmail.com>.
Tim

Yeah so I think you want to set it like the following roughly

Merge Strategy: Bin-Packing Algorithm
Merge Format: Binary Concatenation
Correlation Attribute Name: myFlowfileAttributeName
Minimum number of entries: 2000
Maximum number of entries: 5000
Minimum group size: 1 MB
Maximum group size: 10 MB
Max bin age: 5 min
Maximum number of bins: 50
Delimiter strategy: Text
Header: [
Footer: ]
Demarcator: ,

With this configuration you should end up with all the items together
that have the same correlation attribute value in a given 5 minute
window.  Once an object enters the bucket for a given value the 5
minute timer starts.  Either the minimum number of objects or size is
reached and it gets out right away or the minimums are not reached and
it will get kicked out based on the 5 min timer.

Lastly, consider switching to using MergeRecord and having JSON
readers/writers in it.  It will take care of the framing you're trying
to do with these demarcators.

Thanks
Joe
On Tue, Aug 28, 2018 at 1:08 PM Tim Dean <ti...@gmail.com> wrote:
>
> Thanks Joe - Your explanation makes sense.
>
> I’m now concerned that MergeContent won’t do what I want it to do. In my use case what I really want is to gather ALL the files that come in with a matching attribute value. There could be just one of them, or there could be a couple thousand of them. On average there will be dozens or low hundreds. Flowfiles with matching attribute values will tend to come in around the same time as each other, with some variation due to network and other issues. So what I really want is something like:
>
> When I see a new value in the flow file attribute, begin a new bin
> Allow that bucket to receive as many incoming flow files as it needs to (subject to a maximum as needed to constrain memory usage)
> When no new flow files with a matching attribute value have come in for a configurable duration (e.g. 5 minutes), merge all of the bin’s contents together and move it on to the next processor.
>
>
> Is there a better way to do this in NiFi?
>
> -Tim
>
> On Aug 28, 2018, at 11:15 AM, Joe Witt <jo...@gmail.com> wrote:
>
> Tim,
>
> This processor is powerful and its configurations very specific.
>
> That is a fancy way of saying this beast is complicated.
>
> First, can you highlight which version of NiFi you're using?
>
> Lets look at your settings that would cause a group of items to get
> kicked out as a merge result:
>
> 'minimum number of entries' - you have it at 1.  This means once a
> given bucket contains at least one thing it is eligable/good enough to
> go.  Now, on a given merge session it will put more than 1 in there
> but that will based on how many it has pulled at once.  But, still,
> you want more than 1 it sounds like.
>
> 'minimum group size' - you have it at 0.  By the same logic above this
> is likely much smaller than you intended.
>
> Correlation attribute name: As Juan pointed out this should not be an
> expression language statement if you're trying to give the name of an
> attribute unless the name of the attribute you want would be the
> result of the expression language statement.  This isn't consistent
> with some other cases so in hindsight we should have probably made
> that work differently.
>
> max number of bins:
> If you have ten bins currently being built up and a new one is needed
> it will kick out the oldest bin as 'good enough'.  Consider making
> this larger than 10 but if you know there aren't more than 10 needed
> then you're good.  You also dont want to go wild with this value
> either as it can result in more memory usage than necessary.
>
> Thanks
>
>
> On Tue, Aug 28, 2018 at 12:07 PM Tim Dean <ti...@gmail.com> wrote:
>
>
> I have a flow that sends a large number of JSON files into a MergeContent processor. The job of that processor is to combine all the incoming flow files with a particular flow file attribute into a single flow file, creating a JSON array containing each of the input flow files’ JSON.
>
> I have configured the MergeContent as processor as follows:
>
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Correlation Attribute Name: ${myFlowfileAttributeName}
> Minimum number of entries: 1
> Maximum number of entries: 5000
> Minimum group size: 0 B
> Maximum group size: <no value set>
> Max bin age: 30 min
> Maximum number of bins: 10
> Delimiter strategy: Text
> Header: [
> Footer: ]
> Demarcator: ,
>
>
> When I run data through this flow, I am seeing a large number of small-ish merged flow files being sent to the merged relationship, I was expecting ALL of the files for a given flow file attribute value to be binned together, but they are not coming through that way. To give a example, I pushed through data containing 262 input JSON files. Of these 262, 2 of them have a flow file attribute value of ‘A’, 2 of them have a flow file attribute value of ‘B’, and 258 have a flow file attribute of ‘C’. I was expecting the merged relationship to deliver 3 flow files, one each for value A, B, and C. But.I am seeing 24 flow files on the merged relationship, 1 for a value of A, 1 for a value of B, and 22 of varying sizes with the value of C.
>
> Can someone help me understand what other criteria MergeContent might be using to determine when to send along its merged flow files?
>
> Thanks
>
>

Re: MergeContent prematurely binning flow files?

Posted by Tim Dean <ti...@gmail.com>.
Thanks Joe - Your explanation makes sense.

I’m now concerned that MergeContent won’t do what I want it to do. In my use case what I really want is to gather ALL the files that come in with a matching attribute value. There could be just one of them, or there could be a couple thousand of them. On average there will be dozens or low hundreds. Flowfiles with matching attribute values will tend to come in around the same time as each other, with some variation due to network and other issues. So what I really want is something like:
When I see a new value in the flow file attribute, begin a new bin
Allow that bucket to receive as many incoming flow files as it needs to (subject to a maximum as needed to constrain memory usage)
When no new flow files with a matching attribute value have come in for a configurable duration (e.g. 5 minutes), merge all of the bin’s contents together and move it on to the next processor.

Is there a better way to do this in NiFi?

-Tim

> On Aug 28, 2018, at 11:15 AM, Joe Witt <jo...@gmail.com> wrote:
> 
> Tim,
> 
> This processor is powerful and its configurations very specific.
> 
> That is a fancy way of saying this beast is complicated.
> 
> First, can you highlight which version of NiFi you're using?
> 
> Lets look at your settings that would cause a group of items to get
> kicked out as a merge result:
> 
> 'minimum number of entries' - you have it at 1.  This means once a
> given bucket contains at least one thing it is eligable/good enough to
> go.  Now, on a given merge session it will put more than 1 in there
> but that will based on how many it has pulled at once.  But, still,
> you want more than 1 it sounds like.
> 
> 'minimum group size' - you have it at 0.  By the same logic above this
> is likely much smaller than you intended.
> 
> Correlation attribute name: As Juan pointed out this should not be an
> expression language statement if you're trying to give the name of an
> attribute unless the name of the attribute you want would be the
> result of the expression language statement.  This isn't consistent
> with some other cases so in hindsight we should have probably made
> that work differently.
> 
> max number of bins:
> If you have ten bins currently being built up and a new one is needed
> it will kick out the oldest bin as 'good enough'.  Consider making
> this larger than 10 but if you know there aren't more than 10 needed
> then you're good.  You also dont want to go wild with this value
> either as it can result in more memory usage than necessary.
> 
> Thanks
> 
> 
> On Tue, Aug 28, 2018 at 12:07 PM Tim Dean <ti...@gmail.com> wrote:
>> 
>> I have a flow that sends a large number of JSON files into a MergeContent processor. The job of that processor is to combine all the incoming flow files with a particular flow file attribute into a single flow file, creating a JSON array containing each of the input flow files’ JSON.
>> 
>> I have configured the MergeContent as processor as follows:
>> 
>> Merge Strategy: Bin-Packing Algorithm
>> Merge Format: Binary Concatenation
>> Correlation Attribute Name: ${myFlowfileAttributeName}
>> Minimum number of entries: 1
>> Maximum number of entries: 5000
>> Minimum group size: 0 B
>> Maximum group size: <no value set>
>> Max bin age: 30 min
>> Maximum number of bins: 10
>> Delimiter strategy: Text
>> Header: [
>> Footer: ]
>> Demarcator: ,
>> 
>> 
>> When I run data through this flow, I am seeing a large number of small-ish merged flow files being sent to the merged relationship, I was expecting ALL of the files for a given flow file attribute value to be binned together, but they are not coming through that way. To give a example, I pushed through data containing 262 input JSON files. Of these 262, 2 of them have a flow file attribute value of ‘A’, 2 of them have a flow file attribute value of ‘B’, and 258 have a flow file attribute of ‘C’. I was expecting the merged relationship to deliver 3 flow files, one each for value A, B, and C. But.I am seeing 24 flow files on the merged relationship, 1 for a value of A, 1 for a value of B, and 22 of varying sizes with the value of C.
>> 
>> Can someone help me understand what other criteria MergeContent might be using to determine when to send along its merged flow files?
>> 
>> Thanks


Re: MergeContent prematurely binning flow files?

Posted by Joe Witt <jo...@gmail.com>.
Tim,

This processor is powerful and its configurations very specific.

That is a fancy way of saying this beast is complicated.

First, can you highlight which version of NiFi you're using?

Lets look at your settings that would cause a group of items to get
kicked out as a merge result:

'minimum number of entries' - you have it at 1.  This means once a
given bucket contains at least one thing it is eligable/good enough to
go.  Now, on a given merge session it will put more than 1 in there
but that will based on how many it has pulled at once.  But, still,
you want more than 1 it sounds like.

'minimum group size' - you have it at 0.  By the same logic above this
is likely much smaller than you intended.

Correlation attribute name: As Juan pointed out this should not be an
expression language statement if you're trying to give the name of an
attribute unless the name of the attribute you want would be the
result of the expression language statement.  This isn't consistent
with some other cases so in hindsight we should have probably made
that work differently.

max number of bins:
If you have ten bins currently being built up and a new one is needed
it will kick out the oldest bin as 'good enough'.  Consider making
this larger than 10 but if you know there aren't more than 10 needed
then you're good.  You also dont want to go wild with this value
either as it can result in more memory usage than necessary.

Thanks


On Tue, Aug 28, 2018 at 12:07 PM Tim Dean <ti...@gmail.com> wrote:
>
> I have a flow that sends a large number of JSON files into a MergeContent processor. The job of that processor is to combine all the incoming flow files with a particular flow file attribute into a single flow file, creating a JSON array containing each of the input flow files’ JSON.
>
> I have configured the MergeContent as processor as follows:
>
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Correlation Attribute Name: ${myFlowfileAttributeName}
> Minimum number of entries: 1
> Maximum number of entries: 5000
> Minimum group size: 0 B
> Maximum group size: <no value set>
> Max bin age: 30 min
> Maximum number of bins: 10
> Delimiter strategy: Text
> Header: [
> Footer: ]
> Demarcator: ,
>
>
> When I run data through this flow, I am seeing a large number of small-ish merged flow files being sent to the merged relationship, I was expecting ALL of the files for a given flow file attribute value to be binned together, but they are not coming through that way. To give a example, I pushed through data containing 262 input JSON files. Of these 262, 2 of them have a flow file attribute value of ‘A’, 2 of them have a flow file attribute value of ‘B’, and 258 have a flow file attribute of ‘C’. I was expecting the merged relationship to deliver 3 flow files, one each for value A, B, and C. But.I am seeing 24 flow files on the merged relationship, 1 for a value of A, 1 for a value of B, and 22 of varying sizes with the value of C.
>
> Can someone help me understand what other criteria MergeContent might be using to determine when to send along its merged flow files?
>
> Thanks