You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Martijn Dekkers <ma...@dekkers.org.uk> on 2018/06/05 04:27:06 UTC

Re: Only get file when a set exists.

Hello Koji,

Many thanks, apologies for the delay in responding - I had to work on some
different tasks.

I have followed your advice and have configured a flow accordingly, and on
the whole the logic works. However, I still see the issue where a set will
be stuck in the wait queue. I have tracked it down to the instance where
there is a longer delay between the arrival of ext1 and ext2 files. If I
pause the appropriate processor that gates the ext2 files, that set will
get stuck. If all files come through roughly at a similar time, we see no
issues, and the flow happily runs.

I am not quite sure about the best way to debug this. I have looked at the
attributes in provenance, and notice that the relevant counter for the
specific wait processor isn't updated. I am not sure how I can check the
status of the distributed map cache to see if this might be responsible.

I can share my flowfile, but would have to email it to you directly,
unfortunately I cannot share the flowfile publicly, and sanitising it to
the extent that I can publicly share it would be difficult.

Oh, we are using 1.6

Many thanks,

Martijn

On 31 May 2018 at 09:57, Koji Kawamura <ij...@gmail.com> wrote:

> BTW, which version are you using? I hope it is 1.4.0 or higher. There
> was an issue having effects to your usage.
> https://issues.apache.org/jira/browse/NIFI-4028
>
> On Thu, May 31, 2018 at 4:51 PM, Koji Kawamura <ij...@gmail.com>
> wrote:
> > HI Martijn,
> >
> > I used the filename increment pattern based on your first filename
> example.
> > file_123_456_ab.ex1
> > I increment the 456 part. If it changed, that's fine.
> >
> > Your current configurations look like below:
> > - Given a filename: file_123_type3.ext1
> > - matched with a RegEx: ^file_(\d{3}_)(\w+)\.ext1$
> > - groupID will be: 123_ (including the underscore)
> > - counterName will be: type3
> >
> > I was suggesting include the extension to the counterName.
> > How about changing the RegEx as:
> > - RegEx: ^file_(\d+)_(\w+\.ext\d)$
> > - groupID will be: 123
> > - counterName will be: type3.ext1
> >
> > Then you can route type1.ext1 to Wait branch and other 7 to Notify.
> > In Wait branch, you need 7 Wait processors.
> >
> > It would fast to debug if you can share your flow template..
> >
> > Thanks,
> > Koji
> >
> > On Thu, May 31, 2018 at 3:15 PM, Martijn Dekkers <ma...@dekkers.org.uk>
> wrote:
> >> Thank you Koji,
> >>
> >> I have tried once again, using your updated example. Unfortunately,
> things
> >> still get stuck at the first Wait processors' wait queue.
> >> I did notice that the format of the files your example generates is
> >> different. I will try to clarify:
> >>
> >> - 8 files in total:
> >>
> >> -- file_123_type1.ext1
> >> -- file_123_type1.ext2
> >>
> >> -- file_123_type2.ext1
> >> -- file_123_type2.ext2
> >>
> >> -- file_123_type3.ext1
> >> -- file_123_type3.ext2
> >>
> >> -- file_123_type4.ext1
> >> -- file_123_type4.ext2
> >>
> >> For each set of 8 files, "file_123" increments, so the first set of 8 is
> >> "file_123", and the next set is "file_124" and so on.
> >>
> >> When I look at your example, I notice that at the final step
> (LogAttribute
> >> after the FetchFile set) the filenames are file_123_<incrementing
> >> number>.ex(1|2)
> >>
> >> My UpdateAttribute before the Notify branch is configured as:
> >> groupID - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$1')}
> >> counterName - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$
> 2')}
> >>
> >> The UpdateAttribute before the Wait branch is configured as:
> >> groupID - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$1')}
> >>
> >> The 4 Wait processors in the Wait branch are configured as:
> >> All Wait processors:
> >> Release Signal Identifier - ${groupID}
> >>
> >> For each individual Wait processor:
> >> Signal Counter Name - type1
> >> Signal Counter Name - type2
> >> Signal Counter Name - type3
> >> Signal Counter Name - type4
> >>
> >> I am a bit stumped. The best success we had was a configuration with a
> >> RouteonAttribute sending each of type1|type2|type3|type4 to their own
> Wait
> >> processor, and a similar config for the Notify branch, followed by a
> final
> >> Wait/Notify pair that simply ensures we have the correct amount of sets.
> >>
> >> This configuration did exactly what we want, but unfortunately we had
> random
> >> flowfiles stuck in the waitqueue for no apparent reason.
> >>
> >> Thanks,
> >>
> >> Martijn
> >>
> >>
> >>
> >> On 31 May 2018 at 05:23, Koji Kawamura <ij...@gmail.com> wrote:
> >>>
> >>> The order of arrival does not matter.
> >>>
> >>> Wait processor has 'Expiration Duration' configuration, defaults to 10
> >>> min. Please adjust it according to your needs, the longest period to
> >>> wait for a delayed file.
> >>> If a FlowFile exceeds the duration, it will be sent to 'expired'
> >>> relationship, and can be treated differently, e.g. write ERROR log
> >>>
> >>> > If we have a longer wait for a file, we'd like processing for the
> next
> >>> > groupid to still be able to continue.
> >>>
> >>> In order to achieve that, use Wait Mode = 'Transfer to wait
> >>> relationship', and the 'wait' relationship should be configured to use
> >>> FirstInFirstOutPrioritizer.
> >>> If FirstInFirstOutPrioritizer is not set, the same FlowFile will be
> >>> processed again while it blocks other FlowFiles.
> >>> With FirstInFirstOutPrioritizer, the processed FlowFile will be
> >>> re-queued at the end of wait queue.
> >>>
> >>> I've updated my example to make it more realistic, by adding delay for
> >>> certain set and type.
> >>> https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd
> >>>
> >>> Thanks,
> >>> Koji
> >>>
> >>> On Thu, May 31, 2018 at 10:56 AM, Martijn Dekkers
> >>> <ma...@dekkers.org.uk> wrote:
> >>> > Cool, that will make things a lot simpler. Does it matter that the
> ext2
> >>> > files arrive in random order? Sometimes there can be a very long
> delay
> >>> > in
> >>> > some of them showing up, and we have some concerns about the overall
> >>> > flow
> >>> > blocking. If we have a longer wait for a file, we'd like processing
> for
> >>> > the
> >>> > next groupid to still be able to continue.
> >>> >
> >>> > Thank you for your help (and for writing Wait/Notify!!)
> >>> >
> >>> > Martijn
> >>> >
> >>> > On 31 May 2018 at 03:49, Koji Kawamura <ij...@gmail.com>
> wrote:
> >>> >>
> >>> >> Glad to hear that was helpful.
> >>> >>
> >>> >> "4 same type for each extension", can be treated as "8 distinct
> types"
> >>> >> if an extension is included in a type.
> >>> >> ab.ex1, cd.ex1, ef.ex1, gh.ex1, ab.ex2, cd.ex2, ef.ex2, gh.ex2
> >>> >>
> >>> >> Then route only 'ab.ex1' (or whichever but just 1 of them) to the
> Wait
> >>> >> branch, and the rest to Notify branch.
> >>> >> That will simplify the flow, if I'm not missing any other
> requirement.
> >>> >>
> >>> >> Thanks!
> >>> >> Koji
> >>> >>
> >>> >> On Thu, May 31, 2018 at 10:30 AM, Martijn Dekkers
> >>> >> <ma...@dekkers.org.uk> wrote:
> >>> >> > Hi Koji, Many thanks for your continued assistance!
> >>> >> >
> >>> >> >>
> >>> >> >> - 1 file per second is relatively low in terms of traffic, it
> should
> >>> >> >> be processed fine with 1 thread
> >>> >> >> - A flow like this, which is stateful across different parts of
> the
> >>> >> >> flow works at best with single thread, because using multiple
> >>> >> >> threads
> >>> >> >> would cause race condition or concurrency issue if there's any
> >>> >> >> implementation error
> >>> >> >
> >>> >> >
> >>> >> > Yes, we had similar thoughts.
> >>> >> >
> >>> >> >>
> >>> >> >> - Based on above, I strongly recommend to NOT increase
> "concurrent
> >>> >> >> tasks". If you see FlowFiles staying in a wait queue, then there
> >>> >> >> must
> >>> >> >> be different issue
> >>> >> >
> >>> >> >
> >>> >> > We don't see many flowfiles stuck in a wait queue, I ran a test
> over
> >>> >> > a
> >>> >> > few
> >>> >> > hours yesterday that simulates the way in which these files would
> >>> >> > appear
> >>> >> > (we
> >>> >> > would have 4 of "ext1" show up every second, and the "ext2" can
> show
> >>> >> > up
> >>> >> > a
> >>> >> > few seconds later, and not always in the same order) and we found
> >>> >> > perhaps 6
> >>> >> > flowfiles stuck in a wait queue.
> >>> >> >
> >>> >> >>
> >>> >> >> - Also, using concurrent tasks number like 400 is too much in
> >>> >> >> general
> >>> >> >> for all processors. I recommend to increment it as 2, 3, 4 .. up
> to
> >>> >> >> 8
> >>> >> >> or so, only if you see the clear benefit by doing so
> >>> >> >
> >>> >> >
> >>> >> > Indeed, thanks for the suggestion. Once we have the logic finished
> >>> >> > and
> >>> >> > tested we will have to optimise this Flow. The next step is to
> try to
> >>> >> > load
> >>> >> > the required processors into MiNiFy, as this will be running on
> many
> >>> >> > systems
> >>> >> > with limited capacity. If we don't manage with MiNiFy, we will
> still
> >>> >> > be
> >>> >> > good, but we prefer to have the smaller footprint and ease of
> >>> >> > management
> >>> >> > we
> >>> >> > can obtain with MiNiFy.
> >>> >> >
> >>> >> >>
> >>> >> >> - The important part of this flow is extracting 'groupId' and
> 'type'
> >>> >> >> from file names. Regular Expression needs to be configured
> properly.
> >>> >> >> - I recommend using https://regex101.com/ to test your Regular
> >>> >> >> Expression to see whether it can extract correct groupId and type
> >>> >> >
> >>> >> >
> >>> >> > Yes, we have tested our RegExes for this extensively
> >>> >> >
> >>> >> >>
> >>> >> >>
> >>> >> >> Lastly, regardless of how many files should be there for 'ext1'
> and
> >>> >> >> 'ext2', the flow structure is simple as below.
> >>> >> >> Let's say there should be 8 files to start processing those.
> >>> >> >> 4 x ex1, and 4 ex2 in your case, but let's think it as 8 file
> types.
> >>> >> >> And I assume the types are known, meaning, static, not
> dynamically
> >>> >> >> change.
> >>> >> >
> >>> >> >
> >>> >> > Correct, the format is <groupID><type>.<ext> where:
> >>> >> >
> >>> >> > - groupId is unique for each set of 8
> >>> >> > - type has 4 variants (ab, cd, ef, gh), the same type-set for each
> >>> >> > ext
> >>> >> >
> >>> >> >> So, the rule is, "a set of files consists of 8 files, and a set
> >>> >> >> should
> >>> >> >> wait to be processed until all 8 files are ready", that's all.
> >>> >> >
> >>> >> >
> >>> >> > For our use case it is important that we have positive
> identification
> >>> >> > that
> >>> >> > we have exact "positive identification" of each file.
> >>> >> >
> >>> >> >>
> >>> >> >> Then, the flow should be designed like below:
> >>> >> >> 1. List files, each file will be sent as a FlowFile
> >>> >> >
> >>> >> >
> >>> >> > Correct - we have several different listfiles for other sections
> of
> >>> >> > the
> >>> >> > flow, we are actually collecting many different sets, all
> variants of
> >>> >> > the
> >>> >> > above. However, those are far simpler (sets of 2 - ext1 and ext2
> >>> >> > only)
> >>> >> >
> >>> >> >>
> >>> >> >> 2. Extract groupId and type from filename
> >>> >> >
> >>> >> >
> >>> >> > Correct
> >>> >> >
> >>> >> >>
> >>> >> >> 3. Route FlowFiles into two branches, let's call these 'Notify'
> >>> >> >> branch
> >>> >> >> and 'Wait' branch, and pass only 1 type for a set to Wait-branch,
> >>> >> >> and
> >>> >> >> the rest 7 types to Notify-branch
> >>> >> >
> >>> >> >
> >>> >> > OK, we currently split Notify branch to "all ext1" and wait
> branch to
> >>> >> > "all
> >>> >> > ext2"
> >>> >> >
> >>> >> >>
> >>> >> >> At Notify branch (for the rest 7 types FlowFile, e.g. type 2, 3,
> 4
> >>> >> >> ...
> >>> >> >> 8)
> >>> >> >
> >>> >> >
> >>> >> > As mentioned, we only have 4 distinct types.
> >>> >> >
> >>> >> >>
> >>> >> >> 1. Notify that the type for a group has arrived.
> >>> >> >> 2. Discard the FlowFile, because there's nothing to do with it in
> >>> >> >> this
> >>> >> >> branch
> >>> >> >
> >>> >> >
> >>> >> >
> >>> >> >>
> >>> >> >> At Wait branch (for the type 1 FlowFile)
> >>> >> >> 1. Wait for type 2 for the groupId.
> >>> >> >> 2. Wait for type 3 for the groupId, type 4, 5 and so on
> >>> >> >> 3. After passing Wait for type 8, it can guarantee that all 8
> files
> >>> >> >> are available (unless there is any other program deleting those)
> >>> >> >> 4. Get actual file content using FetchFile, and process it
> >>> >> >
> >>> >> >
> >>> >> > Besides the "4 same types for each extension", this is configured
> as
> >>> >> > you
> >>> >> > describe.
> >>> >> >
> >>> >> >>
> >>> >> >> I hope it helps.
> >>> >> >>
> >>> >> >
> >>> >> > It does, thanks. I will extract this portion of the flow,
> sanitise,
> >>> >> > and
> >>> >> > send
> >>> >> > it along - easier to see than to describe :)
> >>> >> >
> >>> >> >
> >>> >> >>
> >>> >> >> Thanks,
> >>> >> >> Koji
> >>> >> >
> >>> >> >
> >>> >> > Thank you so much once again!
> >>> >> >
> >>> >> > Martijn
> >>> >> >
> >>> >> >
> >>> >> >
> >>> >> >>
> >>> >> >>
> >>> >> >> On Wed, May 30, 2018 at 6:10 PM, Martijn Dekkers
> >>> >> >> <ma...@dekkers.org.uk>
> >>> >> >> wrote:
> >>> >> >> > Hey Pierre,
> >>> >> >> >
> >>> >> >> > Yes, we suspected as much, but we are only seeing this with the
> >>> >> >> > Wait
> >>> >> >> > processor. Possibly because that is the only "blocking" we
> have in
> >>> >> >> > this
> >>> >> >> > flow.
> >>> >> >> >
> >>> >> >> > Thanks for the clarification, much appreciated!
> >>> >> >> >
> >>> >> >> > Martijn
> >>> >> >> >
> >>> >> >> > On 30 May 2018 at 10:30, Pierre Villard
> >>> >> >> > <pi...@gmail.com>
> >>> >> >> > wrote:
> >>> >> >> >>
> >>> >> >> >> I'll let Koji give more information about the Wait/Notify, he
> is
> >>> >> >> >> clearly
> >>> >> >> >> the expert here.
> >>> >> >> >>
> >>> >> >> >> I'm just jumping in regarding your "and when viewing the
> queue,
> >>> >> >> >> the
> >>> >> >> >> dialog
> >>> >> >> >> states that the queue is empty.". You're seeing this behavior
> >>> >> >> >> because,
> >>> >> >> >> even
> >>> >> >> >> though the UI shows some flow files in the queue, the flow
> files
> >>> >> >> >> are
> >>> >> >> >> currently locked in the session of the running processor and
> you
> >>> >> >> >> won't
> >>> >> >> >> see
> >>> >> >> >> flow files currently processed in a session when listing a
> queue.
> >>> >> >> >> If
> >>> >> >> >> you
> >>> >> >> >> stop the processor, the session will be closed and you'll be
> able
> >>> >> >> >> to
> >>> >> >> >> list
> >>> >> >> >> the queue and see the flow files.
> >>> >> >> >>
> >>> >> >> >> I recall discussions in the past to improve the UX for this.
> Not
> >>> >> >> >> sure
> >>> >> >> >> we
> >>> >> >> >> have a JIRA for it though...
> >>> >> >> >>
> >>> >> >> >> Pierre
> >>> >> >> >>
> >>> >> >> >> 2018-05-30 8:26 GMT+02:00 Martijn Dekkers
> >>> >> >> >> <ma...@dekkers.org.uk>:
> >>> >> >> >>>
> >>> >> >> >>> Hi Koji,
> >>> >> >> >>>
> >>> >> >> >>> Thank you for responding. I had adjusted the run schedule to
> >>> >> >> >>> closely
> >>> >> >> >>> mimic our environment. We are expecting about 1 file per
> second
> >>> >> >> >>> or
> >>> >> >> >>> so.
> >>> >> >> >>> We are also seeing some random "orphans" sitting in a wait
> queue
> >>> >> >> >>> every
> >>> >> >> >>> now and again that don't trigger a debug message, and when
> >>> >> >> >>> viewing
> >>> >> >> >>> the
> >>> >> >> >>> queue, the dialog states that the queue is empty.
> >>> >> >> >>>
> >>> >> >> >>> We found the random "no signal found" issue to be
> significantly
> >>> >> >> >>> decreased
> >>> >> >> >>> when we increase the "concurrent tasks" to something large -
> >>> >> >> >>> currently
> >>> >> >> >>> set
> >>> >> >> >>> to 400 for all wait and notify processors.
> >>> >> >> >>>
> >>> >> >> >>> I do need to mention that our requirements had changed since
> you
> >>> >> >> >>> made
> >>> >> >> >>> the
> >>> >> >> >>> template, in that we are looking for a set of 8 files - 4 x
> >>> >> >> >>> "ext1"
> >>> >> >> >>> and
> >>> >> >> >>> 4 x
> >>> >> >> >>> "ext2" both with the same pattern: <groupid><type (4 of
> >>> >> >> >>> these)>.ext1
> >>> >> >> >>> or ext2
> >>> >> >> >>>
> >>> >> >> >>> We found that the best way to make this work was to add
> another
> >>> >> >> >>> wait/notify pair, each processor coming after the ones
> already
> >>> >> >> >>> there,
> >>> >> >> >>> with a
> >>> >> >> >>> simple counter against the groupID.
> >>> >> >> >>>
> >>> >> >> >>> I will export a template for you, many thanks for your help
> - I
> >>> >> >> >>> just
> >>> >> >> >>> need
> >>> >> >> >>> to spend some time sanitising the varies fields etc.
> >>> >> >> >>>
> >>> >> >> >>> Many thanks once again for your kind assistance.
> >>> >> >> >>>
> >>> >> >> >>> Martijn
> >>> >> >> >>>
> >>> >> >> >>> On 30 May 2018 at 08:14, Koji Kawamura <
> ijokarumawak@gmail.com>
> >>> >> >> >>> wrote:
> >>> >> >> >>>>
> >>> >> >> >>>> Hi Martjin,
> >>> >> >> >>>>
> >>> >> >> >>>> In my template, I was using 'Run Schedule' as '5 secs' for
> the
> >>> >> >> >>>> Wait
> >>> >> >> >>>> processors to avoid overusing CPU resource. However, if you
> >>> >> >> >>>> expect
> >>> >> >> >>>> more throughput, it should be lowered.
> >>> >> >> >>>> Changed Run Schedule to 0 sec, and I passed 100 group of
> files
> >>> >> >> >>>> (400
> >>> >> >> >>>> files because 4 files are 1 set in my example), they
> reached to
> >>> >> >> >>>> the
> >>> >> >> >>>> expected goal of the flow without issue.
> >>> >> >> >>>>
> >>> >> >> >>>> If you can share your flow and example input file volume
> >>> >> >> >>>> (hundreds
> >>> >> >> >>>> of
> >>> >> >> >>>> files were fine in my flow), I may be able to provide more
> >>> >> >> >>>> useful
> >>> >> >> >>>> comment.
> >>> >> >> >>>>
> >>> >> >> >>>> Thanks,
> >>> >> >> >>>> Koji
> >>> >> >> >>>>
> >>> >> >> >>>> On Wed, May 30, 2018 at 2:08 PM, Martijn Dekkers
> >>> >> >> >>>> <ma...@dekkers.org.uk> wrote:
> >>> >> >> >>>> > Hi Koji,
> >>> >> >> >>>> >
> >>> >> >> >>>> > I am seeing many issues to get this to run reliably. When
> >>> >> >> >>>> > running
> >>> >> >> >>>> > this
> >>> >> >> >>>> > with
> >>> >> >> >>>> > a few flowfiles at a time, and stepping through by
> switching
> >>> >> >> >>>> > processors on
> >>> >> >> >>>> > and off it works mostly fine, but running this at volume I
> >>> >> >> >>>> > receive
> >>> >> >> >>>> > many
> >>> >> >> >>>> > errors about "no release signal found"
> >>> >> >> >>>> >
> >>> >> >> >>>> > I have tried to fix this in a few different ways, but the
> >>> >> >> >>>> > issue
> >>> >> >> >>>> > keeps
> >>> >> >> >>>> > coming
> >>> >> >> >>>> > back. This is also not consistent at all - different wait
> >>> >> >> >>>> > processors
> >>> >> >> >>>> > will
> >>> >> >> >>>> > block different flowfiles at different times, without
> >>> >> >> >>>> > changing
> >>> >> >> >>>> > any
> >>> >> >> >>>> > configuration. Stop/Start the flow, and different queues
> will
> >>> >> >> >>>> > fill
> >>> >> >> >>>> > up.
> >>> >> >> >>>> > Do
> >>> >> >> >>>> > you have any ideas what could be causing this behavior? I
> >>> >> >> >>>> > checked
> >>> >> >> >>>> > the
> >>> >> >> >>>> > DistributedMapCache Server/Client components, and they all
> >>> >> >> >>>> > appear
> >>> >> >> >>>> > to
> >>> >> >> >>>> > be
> >>> >> >> >>>> > working OK.
> >>> >> >> >>>> >
> >>> >> >> >>>> > Thanks,
> >>> >> >> >>>> >
> >>> >> >> >>>> > Martijn
> >>> >> >> >>>> >
> >>> >> >> >>>> > On 28 May 2018 at 05:11, Koji Kawamura
> >>> >> >> >>>> > <ij...@gmail.com>
> >>> >> >> >>>> > wrote:
> >>> >> >> >>>> >>
> >>> >> >> >>>> >> Hi Martin,
> >>> >> >> >>>> >>
> >>> >> >> >>>> >> Alternative approach is using Wait/Notify processors.
> >>> >> >> >>>> >> I have developed similar flow using those before, and it
> >>> >> >> >>>> >> will
> >>> >> >> >>>> >> work
> >>> >> >> >>>> >> with your case I believe.
> >>> >> >> >>>> >> A NiFi flow template is available here.
> >>> >> >> >>>> >>
> >>> >> >> >>>> >>
> >>> >> >> >>>> >>
> >>> >> >> >>>> >> https://gist.github.com/ijokarumawak/
> 06b3b071eeb4d10d8a27507981422edd
> >>> >> >> >>>> >>
> >>> >> >> >>>> >> Hope this helps,
> >>> >> >> >>>> >> Koji
> >>> >> >> >>>> >>
> >>> >> >> >>>> >>
> >>> >> >> >>>> >> On Sun, May 27, 2018 at 11:48 PM, Andrew Grande
> >>> >> >> >>>> >> <ap...@gmail.com>
> >>> >> >> >>>> >> wrote:
> >>> >> >> >>>> >> > Martijn,
> >>> >> >> >>>> >> >
> >>> >> >> >>>> >> > Here's an idea you could explore. Have the ListFile
> >>> >> >> >>>> >> > processor
> >>> >> >> >>>> >> > work
> >>> >> >> >>>> >> > as
> >>> >> >> >>>> >> > usual
> >>> >> >> >>>> >> > and create a custom component (start with a scripting
> one
> >>> >> >> >>>> >> > to
> >>> >> >> >>>> >> > prototype)
> >>> >> >> >>>> >> > grouping the filenames as needed. I don't know of the
> >>> >> >> >>>> >> > number
> >>> >> >> >>>> >> > of
> >>> >> >> >>>> >> > files in
> >>> >> >> >>>> >> > a
> >>> >> >> >>>> >> > set is different every time, so trying to be more
> robust.
> >>> >> >> >>>> >> >
> >>> >> >> >>>> >> > Once you group and count the set, you can transfer the
> >>> >> >> >>>> >> > names
> >>> >> >> >>>> >> > to
> >>> >> >> >>>> >> > the
> >>> >> >> >>>> >> > success
> >>> >> >> >>>> >> > relationship. Ignore otherwise and wait until the set
> is
> >>> >> >> >>>> >> > full.
> >>> >> >> >>>> >> >
> >>> >> >> >>>> >> > Andrew
> >>> >> >> >>>> >> >
> >>> >> >> >>>> >> >
> >>> >> >> >>>> >> > On Sun, May 27, 2018, 7:29 AM Martijn Dekkers
> >>> >> >> >>>> >> > <ma...@dekkers.org.uk>
> >>> >> >> >>>> >> > wrote:
> >>> >> >> >>>> >> >>
> >>> >> >> >>>> >> >> Hello all,
> >>> >> >> >>>> >> >>
> >>> >> >> >>>> >> >> I am trying to work out an issue with little success.
> >>> >> >> >>>> >> >>
> >>> >> >> >>>> >> >> I need to ingest files generated by some application.
> I
> >>> >> >> >>>> >> >> can
> >>> >> >> >>>> >> >> only
> >>> >> >> >>>> >> >> ingest
> >>> >> >> >>>> >> >> these files when a specific set exists. For example:
> >>> >> >> >>>> >> >>
> >>> >> >> >>>> >> >> file_123_456_ab.ex1
> >>> >> >> >>>> >> >> file_123_456_cd.ex1
> >>> >> >> >>>> >> >> file_123_456_ef.ex1
> >>> >> >> >>>> >> >> file_123_456_gh.ex1
> >>> >> >> >>>> >> >> file_123_456.ex2
> >>> >> >> >>>> >> >>
> >>> >> >> >>>> >> >> Only when a set like that exists should I pick them up
> >>> >> >> >>>> >> >> into
> >>> >> >> >>>> >> >> the
> >>> >> >> >>>> >> >> Flow.
> >>> >> >> >>>> >> >> The
> >>> >> >> >>>> >> >> parts I am looking for to "group" would "ab.ex1",
> >>> >> >> >>>> >> >> "cd.ex1",
> >>> >> >> >>>> >> >> "ef.ex1",
> >>> >> >> >>>> >> >> "gh.ex1", ".ex2".
> >>> >> >> >>>> >> >>
> >>> >> >> >>>> >> >> I tried to do this with some expression, but couldn't
> >>> >> >> >>>> >> >> work
> >>> >> >> >>>> >> >> it
> >>> >> >> >>>> >> >> out.
> >>> >> >> >>>> >> >>
> >>> >> >> >>>> >> >> What would be the best way to achieve this?
> >>> >> >> >>>> >> >>
> >>> >> >> >>>> >> >> Many thanks!
> >>> >> >> >>>> >
> >>> >> >> >>>> >
> >>> >> >> >>>
> >>> >> >> >>>
> >>> >> >> >>
> >>> >> >> >
> >>> >> >
> >>> >> >
> >>> >
> >>> >
> >>
> >>
>

Re: Only get file when a set exists.

Posted by Koji Kawamura <ij...@gmail.com>.
JFYI, updated the template on Gist.
https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd

I personally prefer the 2nd work-around, "setting 'Releasable FlowFile
Count' to 0" because the resulted FlowFiles will have more informative
attributes as evidence.

On Wed, Jun 6, 2018 at 4:50 PM, Koji Kawamura <ij...@gmail.com> wrote:
> Hi Martijn,
>
> Thanks for sharing new information.
> Here are couple of things to help debugging.
>
> # Debug Notify branch
> 1. Stop Wait branch, to debug solely Notify branch function. Wait
> processor deletes cache entry when it thinks no need to keep it any
> longer.
> 2. Make sure Notify has 'success' and 'failure' relationship. Connect
> both relationships to LogAttribute or something stopped, to keep
> FlowFiles in the queue between that and the Notify.
> 3. Confirm every arrived FlowFile is passed to the 'success'
> relationship. This confirms Notify actually sent every notification
> with expected notification identifier.
> 4. Check all expected keys exist in Redis
>
> # Debug Notify branch 2
> If you can't stop Wait branch for some reason, add
> FetchDistributedMapCache right after Notify. This ensures that a
> signal is successfully written in Redis.
>
> If we can confirm Notify branch works without issue, then I'd suspect
> the once written key gets deleted somehow by Wait processors.
>
> There is a race condition between Wait and Notify.
> My hypothesis is:
> Assuming 'type1.ext1' is routed to Wait branch, and other 7 types are
> to Notify branch.
> 1. Notify receives 'type1.ext2', signal becomes {type1.ext2=1}
> 2. Wait for 'type1.ext2' gets a signal, which has counts as {type1.ext2=1}
> 3. Simultaneously, Notify for 'type2.ext1' notifies, the signal is
> updated to {type1.ext2=1, type2.ext1=1}
> 4. Wait for 'type1.ext2' processes the signal, since the count ab
> reached to target 1, it decrement count ab to 0. Then it deletes the
> key, because it thinks the signal is done because it doesn't have any
> count in it.
> 5. Wait for 'type2.ext1' fetch the key, but the entry is already
> deleted. And it gets stuck in the 'wait' relationship.
>
> If that's the case, changing 'Signal Identifier' from groupId to
> groupId.type can avoid the conflict.
> Alternatively, setting 'Releasable FlowFile Count' to 0 can stop Wait
> to delete cache key.
>
> Thanks,
> Koji
>
>
>
> On Tue, Jun 5, 2018 at 9:23 PM, Martijn Dekkers <ma...@dekkers.org.uk> wrote:
>> Hi Koji,
>>
>> Some more information from debugging.
>>
>> I have today deployed Redis since that gives me an easy interface to check
>> the existence of keys, and found that for files that end up stuck in the
>> wait queues, the provenance in the Notify queue shows the relevant flowfile
>> as having arrived, but the relevant key in Redis shows as (nil)
>>
>> Files that have been processed successfully show a "good" key in Redis.
>>
>> Thanks,
>>
>> Martijn
>>
>> On 5 June 2018 at 06:27, Martijn Dekkers <ma...@dekkers.org.uk> wrote:
>>>
>>> Hello Koji,
>>>
>>> Many thanks, apologies for the delay in responding - I had to work on some
>>> different tasks.
>>>
>>> I have followed your advice and have configured a flow accordingly, and on
>>> the whole the logic works. However, I still see the issue where a set will
>>> be stuck in the wait queue. I have tracked it down to the instance where
>>> there is a longer delay between the arrival of ext1 and ext2 files. If I
>>> pause the appropriate processor that gates the ext2 files, that set will get
>>> stuck. If all files come through roughly at a similar time, we see no
>>> issues, and the flow happily runs.
>>>
>>> I am not quite sure about the best way to debug this. I have looked at the
>>> attributes in provenance, and notice that the relevant counter for the
>>> specific wait processor isn't updated. I am not sure how I can check the
>>> status of the distributed map cache to see if this might be responsible.
>>>
>>> I can share my flowfile, but would have to email it to you directly,
>>> unfortunately I cannot share the flowfile publicly, and sanitising it to the
>>> extent that I can publicly share it would be difficult.
>>>
>>> Oh, we are using 1.6
>>>
>>> Many thanks,
>>>
>>> Martijn
>>>
>>> On 31 May 2018 at 09:57, Koji Kawamura <ij...@gmail.com> wrote:
>>>>
>>>> BTW, which version are you using? I hope it is 1.4.0 or higher. There
>>>> was an issue having effects to your usage.
>>>> https://issues.apache.org/jira/browse/NIFI-4028
>>>>
>>>> On Thu, May 31, 2018 at 4:51 PM, Koji Kawamura <ij...@gmail.com>
>>>> wrote:
>>>> > HI Martijn,
>>>> >
>>>> > I used the filename increment pattern based on your first filename
>>>> > example.
>>>> > file_123_456_ab.ex1
>>>> > I increment the 456 part. If it changed, that's fine.
>>>> >
>>>> > Your current configurations look like below:
>>>> > - Given a filename: file_123_type3.ext1
>>>> > - matched with a RegEx: ^file_(\d{3}_)(\w+)\.ext1$
>>>> > - groupID will be: 123_ (including the underscore)
>>>> > - counterName will be: type3
>>>> >
>>>> > I was suggesting include the extension to the counterName.
>>>> > How about changing the RegEx as:
>>>> > - RegEx: ^file_(\d+)_(\w+\.ext\d)$
>>>> > - groupID will be: 123
>>>> > - counterName will be: type3.ext1
>>>> >
>>>> > Then you can route type1.ext1 to Wait branch and other 7 to Notify.
>>>> > In Wait branch, you need 7 Wait processors.
>>>> >
>>>> > It would fast to debug if you can share your flow template..
>>>> >
>>>> > Thanks,
>>>> > Koji
>>>> >
>>>> > On Thu, May 31, 2018 at 3:15 PM, Martijn Dekkers
>>>> > <ma...@dekkers.org.uk> wrote:
>>>> >> Thank you Koji,
>>>> >>
>>>> >> I have tried once again, using your updated example. Unfortunately,
>>>> >> things
>>>> >> still get stuck at the first Wait processors' wait queue.
>>>> >> I did notice that the format of the files your example generates is
>>>> >> different. I will try to clarify:
>>>> >>
>>>> >> - 8 files in total:
>>>> >>
>>>> >> -- file_123_type1.ext1
>>>> >> -- file_123_type1.ext2
>>>> >>
>>>> >> -- file_123_type2.ext1
>>>> >> -- file_123_type2.ext2
>>>> >>
>>>> >> -- file_123_type3.ext1
>>>> >> -- file_123_type3.ext2
>>>> >>
>>>> >> -- file_123_type4.ext1
>>>> >> -- file_123_type4.ext2
>>>> >>
>>>> >> For each set of 8 files, "file_123" increments, so the first set of 8
>>>> >> is
>>>> >> "file_123", and the next set is "file_124" and so on.
>>>> >>
>>>> >> When I look at your example, I notice that at the final step
>>>> >> (LogAttribute
>>>> >> after the FetchFile set) the filenames are file_123_<incrementing
>>>> >> number>.ex(1|2)
>>>> >>
>>>> >> My UpdateAttribute before the Notify branch is configured as:
>>>> >> groupID - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$1')}
>>>> >> counterName -
>>>> >> ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$2')}
>>>> >>
>>>> >> The UpdateAttribute before the Wait branch is configured as:
>>>> >> groupID - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$1')}
>>>> >>
>>>> >> The 4 Wait processors in the Wait branch are configured as:
>>>> >> All Wait processors:
>>>> >> Release Signal Identifier - ${groupID}
>>>> >>
>>>> >> For each individual Wait processor:
>>>> >> Signal Counter Name - type1
>>>> >> Signal Counter Name - type2
>>>> >> Signal Counter Name - type3
>>>> >> Signal Counter Name - type4
>>>> >>
>>>> >> I am a bit stumped. The best success we had was a configuration with a
>>>> >> RouteonAttribute sending each of type1|type2|type3|type4 to their own
>>>> >> Wait
>>>> >> processor, and a similar config for the Notify branch, followed by a
>>>> >> final
>>>> >> Wait/Notify pair that simply ensures we have the correct amount of
>>>> >> sets.
>>>> >>
>>>> >> This configuration did exactly what we want, but unfortunately we had
>>>> >> random
>>>> >> flowfiles stuck in the waitqueue for no apparent reason.
>>>> >>
>>>> >> Thanks,
>>>> >>
>>>> >> Martijn
>>>> >>
>>>> >>
>>>> >>
>>>> >> On 31 May 2018 at 05:23, Koji Kawamura <ij...@gmail.com> wrote:
>>>> >>>
>>>> >>> The order of arrival does not matter.
>>>> >>>
>>>> >>> Wait processor has 'Expiration Duration' configuration, defaults to
>>>> >>> 10
>>>> >>> min. Please adjust it according to your needs, the longest period to
>>>> >>> wait for a delayed file.
>>>> >>> If a FlowFile exceeds the duration, it will be sent to 'expired'
>>>> >>> relationship, and can be treated differently, e.g. write ERROR log
>>>> >>>
>>>> >>> > If we have a longer wait for a file, we'd like processing for the
>>>> >>> > next
>>>> >>> > groupid to still be able to continue.
>>>> >>>
>>>> >>> In order to achieve that, use Wait Mode = 'Transfer to wait
>>>> >>> relationship', and the 'wait' relationship should be configured to
>>>> >>> use
>>>> >>> FirstInFirstOutPrioritizer.
>>>> >>> If FirstInFirstOutPrioritizer is not set, the same FlowFile will be
>>>> >>> processed again while it blocks other FlowFiles.
>>>> >>> With FirstInFirstOutPrioritizer, the processed FlowFile will be
>>>> >>> re-queued at the end of wait queue.
>>>> >>>
>>>> >>> I've updated my example to make it more realistic, by adding delay
>>>> >>> for
>>>> >>> certain set and type.
>>>> >>> https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd
>>>> >>>
>>>> >>> Thanks,
>>>> >>> Koji
>>>> >>>
>>>> >>> On Thu, May 31, 2018 at 10:56 AM, Martijn Dekkers
>>>> >>> <ma...@dekkers.org.uk> wrote:
>>>> >>> > Cool, that will make things a lot simpler. Does it matter that the
>>>> >>> > ext2
>>>> >>> > files arrive in random order? Sometimes there can be a very long
>>>> >>> > delay
>>>> >>> > in
>>>> >>> > some of them showing up, and we have some concerns about the
>>>> >>> > overall
>>>> >>> > flow
>>>> >>> > blocking. If we have a longer wait for a file, we'd like processing
>>>> >>> > for
>>>> >>> > the
>>>> >>> > next groupid to still be able to continue.
>>>> >>> >
>>>> >>> > Thank you for your help (and for writing Wait/Notify!!)
>>>> >>> >
>>>> >>> > Martijn
>>>> >>> >
>>>> >>> > On 31 May 2018 at 03:49, Koji Kawamura <ij...@gmail.com>
>>>> >>> > wrote:
>>>> >>> >>
>>>> >>> >> Glad to hear that was helpful.
>>>> >>> >>
>>>> >>> >> "4 same type for each extension", can be treated as "8 distinct
>>>> >>> >> types"
>>>> >>> >> if an extension is included in a type.
>>>> >>> >> ab.ex1, cd.ex1, ef.ex1, gh.ex1, ab.ex2, cd.ex2, ef.ex2, gh.ex2
>>>> >>> >>
>>>> >>> >> Then route only 'ab.ex1' (or whichever but just 1 of them) to the
>>>> >>> >> Wait
>>>> >>> >> branch, and the rest to Notify branch.
>>>> >>> >> That will simplify the flow, if I'm not missing any other
>>>> >>> >> requirement.
>>>> >>> >>
>>>> >>> >> Thanks!
>>>> >>> >> Koji
>>>> >>> >>
>>>> >>> >> On Thu, May 31, 2018 at 10:30 AM, Martijn Dekkers
>>>> >>> >> <ma...@dekkers.org.uk> wrote:
>>>> >>> >> > Hi Koji, Many thanks for your continued assistance!
>>>> >>> >> >
>>>> >>> >> >>
>>>> >>> >> >> - 1 file per second is relatively low in terms of traffic, it
>>>> >>> >> >> should
>>>> >>> >> >> be processed fine with 1 thread
>>>> >>> >> >> - A flow like this, which is stateful across different parts of
>>>> >>> >> >> the
>>>> >>> >> >> flow works at best with single thread, because using multiple
>>>> >>> >> >> threads
>>>> >>> >> >> would cause race condition or concurrency issue if there's any
>>>> >>> >> >> implementation error
>>>> >>> >> >
>>>> >>> >> >
>>>> >>> >> > Yes, we had similar thoughts.
>>>> >>> >> >
>>>> >>> >> >>
>>>> >>> >> >> - Based on above, I strongly recommend to NOT increase
>>>> >>> >> >> "concurrent
>>>> >>> >> >> tasks". If you see FlowFiles staying in a wait queue, then
>>>> >>> >> >> there
>>>> >>> >> >> must
>>>> >>> >> >> be different issue
>>>> >>> >> >
>>>> >>> >> >
>>>> >>> >> > We don't see many flowfiles stuck in a wait queue, I ran a test
>>>> >>> >> > over
>>>> >>> >> > a
>>>> >>> >> > few
>>>> >>> >> > hours yesterday that simulates the way in which these files
>>>> >>> >> > would
>>>> >>> >> > appear
>>>> >>> >> > (we
>>>> >>> >> > would have 4 of "ext1" show up every second, and the "ext2" can
>>>> >>> >> > show
>>>> >>> >> > up
>>>> >>> >> > a
>>>> >>> >> > few seconds later, and not always in the same order) and we
>>>> >>> >> > found
>>>> >>> >> > perhaps 6
>>>> >>> >> > flowfiles stuck in a wait queue.
>>>> >>> >> >
>>>> >>> >> >>
>>>> >>> >> >> - Also, using concurrent tasks number like 400 is too much in
>>>> >>> >> >> general
>>>> >>> >> >> for all processors. I recommend to increment it as 2, 3, 4 ..
>>>> >>> >> >> up to
>>>> >>> >> >> 8
>>>> >>> >> >> or so, only if you see the clear benefit by doing so
>>>> >>> >> >
>>>> >>> >> >
>>>> >>> >> > Indeed, thanks for the suggestion. Once we have the logic
>>>> >>> >> > finished
>>>> >>> >> > and
>>>> >>> >> > tested we will have to optimise this Flow. The next step is to
>>>> >>> >> > try to
>>>> >>> >> > load
>>>> >>> >> > the required processors into MiNiFy, as this will be running on
>>>> >>> >> > many
>>>> >>> >> > systems
>>>> >>> >> > with limited capacity. If we don't manage with MiNiFy, we will
>>>> >>> >> > still
>>>> >>> >> > be
>>>> >>> >> > good, but we prefer to have the smaller footprint and ease of
>>>> >>> >> > management
>>>> >>> >> > we
>>>> >>> >> > can obtain with MiNiFy.
>>>> >>> >> >
>>>> >>> >> >>
>>>> >>> >> >> - The important part of this flow is extracting 'groupId' and
>>>> >>> >> >> 'type'
>>>> >>> >> >> from file names. Regular Expression needs to be configured
>>>> >>> >> >> properly.
>>>> >>> >> >> - I recommend using https://regex101.com/ to test your Regular
>>>> >>> >> >> Expression to see whether it can extract correct groupId and
>>>> >>> >> >> type
>>>> >>> >> >
>>>> >>> >> >
>>>> >>> >> > Yes, we have tested our RegExes for this extensively
>>>> >>> >> >
>>>> >>> >> >>
>>>> >>> >> >>
>>>> >>> >> >> Lastly, regardless of how many files should be there for 'ext1'
>>>> >>> >> >> and
>>>> >>> >> >> 'ext2', the flow structure is simple as below.
>>>> >>> >> >> Let's say there should be 8 files to start processing those.
>>>> >>> >> >> 4 x ex1, and 4 ex2 in your case, but let's think it as 8 file
>>>> >>> >> >> types.
>>>> >>> >> >> And I assume the types are known, meaning, static, not
>>>> >>> >> >> dynamically
>>>> >>> >> >> change.
>>>> >>> >> >
>>>> >>> >> >
>>>> >>> >> > Correct, the format is <groupID><type>.<ext> where:
>>>> >>> >> >
>>>> >>> >> > - groupId is unique for each set of 8
>>>> >>> >> > - type has 4 variants (ab, cd, ef, gh), the same type-set for
>>>> >>> >> > each
>>>> >>> >> > ext
>>>> >>> >> >
>>>> >>> >> >> So, the rule is, "a set of files consists of 8 files, and a set
>>>> >>> >> >> should
>>>> >>> >> >> wait to be processed until all 8 files are ready", that's all.
>>>> >>> >> >
>>>> >>> >> >
>>>> >>> >> > For our use case it is important that we have positive
>>>> >>> >> > identification
>>>> >>> >> > that
>>>> >>> >> > we have exact "positive identification" of each file.
>>>> >>> >> >
>>>> >>> >> >>
>>>> >>> >> >> Then, the flow should be designed like below:
>>>> >>> >> >> 1. List files, each file will be sent as a FlowFile
>>>> >>> >> >
>>>> >>> >> >
>>>> >>> >> > Correct - we have several different listfiles for other sections
>>>> >>> >> > of
>>>> >>> >> > the
>>>> >>> >> > flow, we are actually collecting many different sets, all
>>>> >>> >> > variants of
>>>> >>> >> > the
>>>> >>> >> > above. However, those are far simpler (sets of 2 - ext1 and ext2
>>>> >>> >> > only)
>>>> >>> >> >
>>>> >>> >> >>
>>>> >>> >> >> 2. Extract groupId and type from filename
>>>> >>> >> >
>>>> >>> >> >
>>>> >>> >> > Correct
>>>> >>> >> >
>>>> >>> >> >>
>>>> >>> >> >> 3. Route FlowFiles into two branches, let's call these 'Notify'
>>>> >>> >> >> branch
>>>> >>> >> >> and 'Wait' branch, and pass only 1 type for a set to
>>>> >>> >> >> Wait-branch,
>>>> >>> >> >> and
>>>> >>> >> >> the rest 7 types to Notify-branch
>>>> >>> >> >
>>>> >>> >> >
>>>> >>> >> > OK, we currently split Notify branch to "all ext1" and wait
>>>> >>> >> > branch to
>>>> >>> >> > "all
>>>> >>> >> > ext2"
>>>> >>> >> >
>>>> >>> >> >>
>>>> >>> >> >> At Notify branch (for the rest 7 types FlowFile, e.g. type 2,
>>>> >>> >> >> 3, 4
>>>> >>> >> >> ...
>>>> >>> >> >> 8)
>>>> >>> >> >
>>>> >>> >> >
>>>> >>> >> > As mentioned, we only have 4 distinct types.
>>>> >>> >> >
>>>> >>> >> >>
>>>> >>> >> >> 1. Notify that the type for a group has arrived.
>>>> >>> >> >> 2. Discard the FlowFile, because there's nothing to do with it
>>>> >>> >> >> in
>>>> >>> >> >> this
>>>> >>> >> >> branch
>>>> >>> >> >
>>>> >>> >> >
>>>> >>> >> >
>>>> >>> >> >>
>>>> >>> >> >> At Wait branch (for the type 1 FlowFile)
>>>> >>> >> >> 1. Wait for type 2 for the groupId.
>>>> >>> >> >> 2. Wait for type 3 for the groupId, type 4, 5 and so on
>>>> >>> >> >> 3. After passing Wait for type 8, it can guarantee that all 8
>>>> >>> >> >> files
>>>> >>> >> >> are available (unless there is any other program deleting
>>>> >>> >> >> those)
>>>> >>> >> >> 4. Get actual file content using FetchFile, and process it
>>>> >>> >> >
>>>> >>> >> >
>>>> >>> >> > Besides the "4 same types for each extension", this is
>>>> >>> >> > configured as
>>>> >>> >> > you
>>>> >>> >> > describe.
>>>> >>> >> >
>>>> >>> >> >>
>>>> >>> >> >> I hope it helps.
>>>> >>> >> >>
>>>> >>> >> >
>>>> >>> >> > It does, thanks. I will extract this portion of the flow,
>>>> >>> >> > sanitise,
>>>> >>> >> > and
>>>> >>> >> > send
>>>> >>> >> > it along - easier to see than to describe :)
>>>> >>> >> >
>>>> >>> >> >
>>>> >>> >> >>
>>>> >>> >> >> Thanks,
>>>> >>> >> >> Koji
>>>> >>> >> >
>>>> >>> >> >
>>>> >>> >> > Thank you so much once again!
>>>> >>> >> >
>>>> >>> >> > Martijn
>>>> >>> >> >
>>>> >>> >> >
>>>> >>> >> >
>>>> >>> >> >>
>>>> >>> >> >>
>>>> >>> >> >> On Wed, May 30, 2018 at 6:10 PM, Martijn Dekkers
>>>> >>> >> >> <ma...@dekkers.org.uk>
>>>> >>> >> >> wrote:
>>>> >>> >> >> > Hey Pierre,
>>>> >>> >> >> >
>>>> >>> >> >> > Yes, we suspected as much, but we are only seeing this with
>>>> >>> >> >> > the
>>>> >>> >> >> > Wait
>>>> >>> >> >> > processor. Possibly because that is the only "blocking" we
>>>> >>> >> >> > have in
>>>> >>> >> >> > this
>>>> >>> >> >> > flow.
>>>> >>> >> >> >
>>>> >>> >> >> > Thanks for the clarification, much appreciated!
>>>> >>> >> >> >
>>>> >>> >> >> > Martijn
>>>> >>> >> >> >
>>>> >>> >> >> > On 30 May 2018 at 10:30, Pierre Villard
>>>> >>> >> >> > <pi...@gmail.com>
>>>> >>> >> >> > wrote:
>>>> >>> >> >> >>
>>>> >>> >> >> >> I'll let Koji give more information about the Wait/Notify,
>>>> >>> >> >> >> he is
>>>> >>> >> >> >> clearly
>>>> >>> >> >> >> the expert here.
>>>> >>> >> >> >>
>>>> >>> >> >> >> I'm just jumping in regarding your "and when viewing the
>>>> >>> >> >> >> queue,
>>>> >>> >> >> >> the
>>>> >>> >> >> >> dialog
>>>> >>> >> >> >> states that the queue is empty.". You're seeing this
>>>> >>> >> >> >> behavior
>>>> >>> >> >> >> because,
>>>> >>> >> >> >> even
>>>> >>> >> >> >> though the UI shows some flow files in the queue, the flow
>>>> >>> >> >> >> files
>>>> >>> >> >> >> are
>>>> >>> >> >> >> currently locked in the session of the running processor and
>>>> >>> >> >> >> you
>>>> >>> >> >> >> won't
>>>> >>> >> >> >> see
>>>> >>> >> >> >> flow files currently processed in a session when listing a
>>>> >>> >> >> >> queue.
>>>> >>> >> >> >> If
>>>> >>> >> >> >> you
>>>> >>> >> >> >> stop the processor, the session will be closed and you'll be
>>>> >>> >> >> >> able
>>>> >>> >> >> >> to
>>>> >>> >> >> >> list
>>>> >>> >> >> >> the queue and see the flow files.
>>>> >>> >> >> >>
>>>> >>> >> >> >> I recall discussions in the past to improve the UX for this.
>>>> >>> >> >> >> Not
>>>> >>> >> >> >> sure
>>>> >>> >> >> >> we
>>>> >>> >> >> >> have a JIRA for it though...
>>>> >>> >> >> >>
>>>> >>> >> >> >> Pierre
>>>> >>> >> >> >>
>>>> >>> >> >> >> 2018-05-30 8:26 GMT+02:00 Martijn Dekkers
>>>> >>> >> >> >> <ma...@dekkers.org.uk>:
>>>> >>> >> >> >>>
>>>> >>> >> >> >>> Hi Koji,
>>>> >>> >> >> >>>
>>>> >>> >> >> >>> Thank you for responding. I had adjusted the run schedule
>>>> >>> >> >> >>> to
>>>> >>> >> >> >>> closely
>>>> >>> >> >> >>> mimic our environment. We are expecting about 1 file per
>>>> >>> >> >> >>> second
>>>> >>> >> >> >>> or
>>>> >>> >> >> >>> so.
>>>> >>> >> >> >>> We are also seeing some random "orphans" sitting in a wait
>>>> >>> >> >> >>> queue
>>>> >>> >> >> >>> every
>>>> >>> >> >> >>> now and again that don't trigger a debug message, and when
>>>> >>> >> >> >>> viewing
>>>> >>> >> >> >>> the
>>>> >>> >> >> >>> queue, the dialog states that the queue is empty.
>>>> >>> >> >> >>>
>>>> >>> >> >> >>> We found the random "no signal found" issue to be
>>>> >>> >> >> >>> significantly
>>>> >>> >> >> >>> decreased
>>>> >>> >> >> >>> when we increase the "concurrent tasks" to something large
>>>> >>> >> >> >>> -
>>>> >>> >> >> >>> currently
>>>> >>> >> >> >>> set
>>>> >>> >> >> >>> to 400 for all wait and notify processors.
>>>> >>> >> >> >>>
>>>> >>> >> >> >>> I do need to mention that our requirements had changed
>>>> >>> >> >> >>> since you
>>>> >>> >> >> >>> made
>>>> >>> >> >> >>> the
>>>> >>> >> >> >>> template, in that we are looking for a set of 8 files - 4 x
>>>> >>> >> >> >>> "ext1"
>>>> >>> >> >> >>> and
>>>> >>> >> >> >>> 4 x
>>>> >>> >> >> >>> "ext2" both with the same pattern: <groupid><type (4 of
>>>> >>> >> >> >>> these)>.ext1
>>>> >>> >> >> >>> or ext2
>>>> >>> >> >> >>>
>>>> >>> >> >> >>> We found that the best way to make this work was to add
>>>> >>> >> >> >>> another
>>>> >>> >> >> >>> wait/notify pair, each processor coming after the ones
>>>> >>> >> >> >>> already
>>>> >>> >> >> >>> there,
>>>> >>> >> >> >>> with a
>>>> >>> >> >> >>> simple counter against the groupID.
>>>> >>> >> >> >>>
>>>> >>> >> >> >>> I will export a template for you, many thanks for your help
>>>> >>> >> >> >>> - I
>>>> >>> >> >> >>> just
>>>> >>> >> >> >>> need
>>>> >>> >> >> >>> to spend some time sanitising the varies fields etc.
>>>> >>> >> >> >>>
>>>> >>> >> >> >>> Many thanks once again for your kind assistance.
>>>> >>> >> >> >>>
>>>> >>> >> >> >>> Martijn
>>>> >>> >> >> >>>
>>>> >>> >> >> >>> On 30 May 2018 at 08:14, Koji Kawamura
>>>> >>> >> >> >>> <ij...@gmail.com>
>>>> >>> >> >> >>> wrote:
>>>> >>> >> >> >>>>
>>>> >>> >> >> >>>> Hi Martjin,
>>>> >>> >> >> >>>>
>>>> >>> >> >> >>>> In my template, I was using 'Run Schedule' as '5 secs' for
>>>> >>> >> >> >>>> the
>>>> >>> >> >> >>>> Wait
>>>> >>> >> >> >>>> processors to avoid overusing CPU resource. However, if
>>>> >>> >> >> >>>> you
>>>> >>> >> >> >>>> expect
>>>> >>> >> >> >>>> more throughput, it should be lowered.
>>>> >>> >> >> >>>> Changed Run Schedule to 0 sec, and I passed 100 group of
>>>> >>> >> >> >>>> files
>>>> >>> >> >> >>>> (400
>>>> >>> >> >> >>>> files because 4 files are 1 set in my example), they
>>>> >>> >> >> >>>> reached to
>>>> >>> >> >> >>>> the
>>>> >>> >> >> >>>> expected goal of the flow without issue.
>>>> >>> >> >> >>>>
>>>> >>> >> >> >>>> If you can share your flow and example input file volume
>>>> >>> >> >> >>>> (hundreds
>>>> >>> >> >> >>>> of
>>>> >>> >> >> >>>> files were fine in my flow), I may be able to provide more
>>>> >>> >> >> >>>> useful
>>>> >>> >> >> >>>> comment.
>>>> >>> >> >> >>>>
>>>> >>> >> >> >>>> Thanks,
>>>> >>> >> >> >>>> Koji
>>>> >>> >> >> >>>>
>>>> >>> >> >> >>>> On Wed, May 30, 2018 at 2:08 PM, Martijn Dekkers
>>>> >>> >> >> >>>> <ma...@dekkers.org.uk> wrote:
>>>> >>> >> >> >>>> > Hi Koji,
>>>> >>> >> >> >>>> >
>>>> >>> >> >> >>>> > I am seeing many issues to get this to run reliably.
>>>> >>> >> >> >>>> > When
>>>> >>> >> >> >>>> > running
>>>> >>> >> >> >>>> > this
>>>> >>> >> >> >>>> > with
>>>> >>> >> >> >>>> > a few flowfiles at a time, and stepping through by
>>>> >>> >> >> >>>> > switching
>>>> >>> >> >> >>>> > processors on
>>>> >>> >> >> >>>> > and off it works mostly fine, but running this at volume
>>>> >>> >> >> >>>> > I
>>>> >>> >> >> >>>> > receive
>>>> >>> >> >> >>>> > many
>>>> >>> >> >> >>>> > errors about "no release signal found"
>>>> >>> >> >> >>>> >
>>>> >>> >> >> >>>> > I have tried to fix this in a few different ways, but
>>>> >>> >> >> >>>> > the
>>>> >>> >> >> >>>> > issue
>>>> >>> >> >> >>>> > keeps
>>>> >>> >> >> >>>> > coming
>>>> >>> >> >> >>>> > back. This is also not consistent at all - different
>>>> >>> >> >> >>>> > wait
>>>> >>> >> >> >>>> > processors
>>>> >>> >> >> >>>> > will
>>>> >>> >> >> >>>> > block different flowfiles at different times, without
>>>> >>> >> >> >>>> > changing
>>>> >>> >> >> >>>> > any
>>>> >>> >> >> >>>> > configuration. Stop/Start the flow, and different queues
>>>> >>> >> >> >>>> > will
>>>> >>> >> >> >>>> > fill
>>>> >>> >> >> >>>> > up.
>>>> >>> >> >> >>>> > Do
>>>> >>> >> >> >>>> > you have any ideas what could be causing this behavior?
>>>> >>> >> >> >>>> > I
>>>> >>> >> >> >>>> > checked
>>>> >>> >> >> >>>> > the
>>>> >>> >> >> >>>> > DistributedMapCache Server/Client components, and they
>>>> >>> >> >> >>>> > all
>>>> >>> >> >> >>>> > appear
>>>> >>> >> >> >>>> > to
>>>> >>> >> >> >>>> > be
>>>> >>> >> >> >>>> > working OK.
>>>> >>> >> >> >>>> >
>>>> >>> >> >> >>>> > Thanks,
>>>> >>> >> >> >>>> >
>>>> >>> >> >> >>>> > Martijn
>>>> >>> >> >> >>>> >
>>>> >>> >> >> >>>> > On 28 May 2018 at 05:11, Koji Kawamura
>>>> >>> >> >> >>>> > <ij...@gmail.com>
>>>> >>> >> >> >>>> > wrote:
>>>> >>> >> >> >>>> >>
>>>> >>> >> >> >>>> >> Hi Martin,
>>>> >>> >> >> >>>> >>
>>>> >>> >> >> >>>> >> Alternative approach is using Wait/Notify processors.
>>>> >>> >> >> >>>> >> I have developed similar flow using those before, and
>>>> >>> >> >> >>>> >> it
>>>> >>> >> >> >>>> >> will
>>>> >>> >> >> >>>> >> work
>>>> >>> >> >> >>>> >> with your case I believe.
>>>> >>> >> >> >>>> >> A NiFi flow template is available here.
>>>> >>> >> >> >>>> >>
>>>> >>> >> >> >>>> >>
>>>> >>> >> >> >>>> >>
>>>> >>> >> >> >>>> >>
>>>> >>> >> >> >>>> >> https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd
>>>> >>> >> >> >>>> >>
>>>> >>> >> >> >>>> >> Hope this helps,
>>>> >>> >> >> >>>> >> Koji
>>>> >>> >> >> >>>> >>
>>>> >>> >> >> >>>> >>
>>>> >>> >> >> >>>> >> On Sun, May 27, 2018 at 11:48 PM, Andrew Grande
>>>> >>> >> >> >>>> >> <ap...@gmail.com>
>>>> >>> >> >> >>>> >> wrote:
>>>> >>> >> >> >>>> >> > Martijn,
>>>> >>> >> >> >>>> >> >
>>>> >>> >> >> >>>> >> > Here's an idea you could explore. Have the ListFile
>>>> >>> >> >> >>>> >> > processor
>>>> >>> >> >> >>>> >> > work
>>>> >>> >> >> >>>> >> > as
>>>> >>> >> >> >>>> >> > usual
>>>> >>> >> >> >>>> >> > and create a custom component (start with a scripting
>>>> >>> >> >> >>>> >> > one
>>>> >>> >> >> >>>> >> > to
>>>> >>> >> >> >>>> >> > prototype)
>>>> >>> >> >> >>>> >> > grouping the filenames as needed. I don't know of the
>>>> >>> >> >> >>>> >> > number
>>>> >>> >> >> >>>> >> > of
>>>> >>> >> >> >>>> >> > files in
>>>> >>> >> >> >>>> >> > a
>>>> >>> >> >> >>>> >> > set is different every time, so trying to be more
>>>> >>> >> >> >>>> >> > robust.
>>>> >>> >> >> >>>> >> >
>>>> >>> >> >> >>>> >> > Once you group and count the set, you can transfer
>>>> >>> >> >> >>>> >> > the
>>>> >>> >> >> >>>> >> > names
>>>> >>> >> >> >>>> >> > to
>>>> >>> >> >> >>>> >> > the
>>>> >>> >> >> >>>> >> > success
>>>> >>> >> >> >>>> >> > relationship. Ignore otherwise and wait until the set
>>>> >>> >> >> >>>> >> > is
>>>> >>> >> >> >>>> >> > full.
>>>> >>> >> >> >>>> >> >
>>>> >>> >> >> >>>> >> > Andrew
>>>> >>> >> >> >>>> >> >
>>>> >>> >> >> >>>> >> >
>>>> >>> >> >> >>>> >> > On Sun, May 27, 2018, 7:29 AM Martijn Dekkers
>>>> >>> >> >> >>>> >> > <ma...@dekkers.org.uk>
>>>> >>> >> >> >>>> >> > wrote:
>>>> >>> >> >> >>>> >> >>
>>>> >>> >> >> >>>> >> >> Hello all,
>>>> >>> >> >> >>>> >> >>
>>>> >>> >> >> >>>> >> >> I am trying to work out an issue with little
>>>> >>> >> >> >>>> >> >> success.
>>>> >>> >> >> >>>> >> >>
>>>> >>> >> >> >>>> >> >> I need to ingest files generated by some
>>>> >>> >> >> >>>> >> >> application. I
>>>> >>> >> >> >>>> >> >> can
>>>> >>> >> >> >>>> >> >> only
>>>> >>> >> >> >>>> >> >> ingest
>>>> >>> >> >> >>>> >> >> these files when a specific set exists. For example:
>>>> >>> >> >> >>>> >> >>
>>>> >>> >> >> >>>> >> >> file_123_456_ab.ex1
>>>> >>> >> >> >>>> >> >> file_123_456_cd.ex1
>>>> >>> >> >> >>>> >> >> file_123_456_ef.ex1
>>>> >>> >> >> >>>> >> >> file_123_456_gh.ex1
>>>> >>> >> >> >>>> >> >> file_123_456.ex2
>>>> >>> >> >> >>>> >> >>
>>>> >>> >> >> >>>> >> >> Only when a set like that exists should I pick them
>>>> >>> >> >> >>>> >> >> up
>>>> >>> >> >> >>>> >> >> into
>>>> >>> >> >> >>>> >> >> the
>>>> >>> >> >> >>>> >> >> Flow.
>>>> >>> >> >> >>>> >> >> The
>>>> >>> >> >> >>>> >> >> parts I am looking for to "group" would "ab.ex1",
>>>> >>> >> >> >>>> >> >> "cd.ex1",
>>>> >>> >> >> >>>> >> >> "ef.ex1",
>>>> >>> >> >> >>>> >> >> "gh.ex1", ".ex2".
>>>> >>> >> >> >>>> >> >>
>>>> >>> >> >> >>>> >> >> I tried to do this with some expression, but
>>>> >>> >> >> >>>> >> >> couldn't
>>>> >>> >> >> >>>> >> >> work
>>>> >>> >> >> >>>> >> >> it
>>>> >>> >> >> >>>> >> >> out.
>>>> >>> >> >> >>>> >> >>
>>>> >>> >> >> >>>> >> >> What would be the best way to achieve this?
>>>> >>> >> >> >>>> >> >>
>>>> >>> >> >> >>>> >> >> Many thanks!
>>>> >>> >> >> >>>> >
>>>> >>> >> >> >>>> >
>>>> >>> >> >> >>>
>>>> >>> >> >> >>>
>>>> >>> >> >> >>
>>>> >>> >> >> >
>>>> >>> >> >
>>>> >>> >> >
>>>> >>> >
>>>> >>> >
>>>> >>
>>>> >>
>>>
>>>
>>

Re: Only get file when a set exists.

Posted by Koji Kawamura <ij...@gmail.com>.
Hi Martijn,

Thanks for sharing new information.
Here are couple of things to help debugging.

# Debug Notify branch
1. Stop Wait branch, to debug solely Notify branch function. Wait
processor deletes cache entry when it thinks no need to keep it any
longer.
2. Make sure Notify has 'success' and 'failure' relationship. Connect
both relationships to LogAttribute or something stopped, to keep
FlowFiles in the queue between that and the Notify.
3. Confirm every arrived FlowFile is passed to the 'success'
relationship. This confirms Notify actually sent every notification
with expected notification identifier.
4. Check all expected keys exist in Redis

# Debug Notify branch 2
If you can't stop Wait branch for some reason, add
FetchDistributedMapCache right after Notify. This ensures that a
signal is successfully written in Redis.

If we can confirm Notify branch works without issue, then I'd suspect
the once written key gets deleted somehow by Wait processors.

There is a race condition between Wait and Notify.
My hypothesis is:
Assuming 'type1.ext1' is routed to Wait branch, and other 7 types are
to Notify branch.
1. Notify receives 'type1.ext2', signal becomes {type1.ext2=1}
2. Wait for 'type1.ext2' gets a signal, which has counts as {type1.ext2=1}
3. Simultaneously, Notify for 'type2.ext1' notifies, the signal is
updated to {type1.ext2=1, type2.ext1=1}
4. Wait for 'type1.ext2' processes the signal, since the count ab
reached to target 1, it decrement count ab to 0. Then it deletes the
key, because it thinks the signal is done because it doesn't have any
count in it.
5. Wait for 'type2.ext1' fetch the key, but the entry is already
deleted. And it gets stuck in the 'wait' relationship.

If that's the case, changing 'Signal Identifier' from groupId to
groupId.type can avoid the conflict.
Alternatively, setting 'Releasable FlowFile Count' to 0 can stop Wait
to delete cache key.

Thanks,
Koji



On Tue, Jun 5, 2018 at 9:23 PM, Martijn Dekkers <ma...@dekkers.org.uk> wrote:
> Hi Koji,
>
> Some more information from debugging.
>
> I have today deployed Redis since that gives me an easy interface to check
> the existence of keys, and found that for files that end up stuck in the
> wait queues, the provenance in the Notify queue shows the relevant flowfile
> as having arrived, but the relevant key in Redis shows as (nil)
>
> Files that have been processed successfully show a "good" key in Redis.
>
> Thanks,
>
> Martijn
>
> On 5 June 2018 at 06:27, Martijn Dekkers <ma...@dekkers.org.uk> wrote:
>>
>> Hello Koji,
>>
>> Many thanks, apologies for the delay in responding - I had to work on some
>> different tasks.
>>
>> I have followed your advice and have configured a flow accordingly, and on
>> the whole the logic works. However, I still see the issue where a set will
>> be stuck in the wait queue. I have tracked it down to the instance where
>> there is a longer delay between the arrival of ext1 and ext2 files. If I
>> pause the appropriate processor that gates the ext2 files, that set will get
>> stuck. If all files come through roughly at a similar time, we see no
>> issues, and the flow happily runs.
>>
>> I am not quite sure about the best way to debug this. I have looked at the
>> attributes in provenance, and notice that the relevant counter for the
>> specific wait processor isn't updated. I am not sure how I can check the
>> status of the distributed map cache to see if this might be responsible.
>>
>> I can share my flowfile, but would have to email it to you directly,
>> unfortunately I cannot share the flowfile publicly, and sanitising it to the
>> extent that I can publicly share it would be difficult.
>>
>> Oh, we are using 1.6
>>
>> Many thanks,
>>
>> Martijn
>>
>> On 31 May 2018 at 09:57, Koji Kawamura <ij...@gmail.com> wrote:
>>>
>>> BTW, which version are you using? I hope it is 1.4.0 or higher. There
>>> was an issue having effects to your usage.
>>> https://issues.apache.org/jira/browse/NIFI-4028
>>>
>>> On Thu, May 31, 2018 at 4:51 PM, Koji Kawamura <ij...@gmail.com>
>>> wrote:
>>> > HI Martijn,
>>> >
>>> > I used the filename increment pattern based on your first filename
>>> > example.
>>> > file_123_456_ab.ex1
>>> > I increment the 456 part. If it changed, that's fine.
>>> >
>>> > Your current configurations look like below:
>>> > - Given a filename: file_123_type3.ext1
>>> > - matched with a RegEx: ^file_(\d{3}_)(\w+)\.ext1$
>>> > - groupID will be: 123_ (including the underscore)
>>> > - counterName will be: type3
>>> >
>>> > I was suggesting include the extension to the counterName.
>>> > How about changing the RegEx as:
>>> > - RegEx: ^file_(\d+)_(\w+\.ext\d)$
>>> > - groupID will be: 123
>>> > - counterName will be: type3.ext1
>>> >
>>> > Then you can route type1.ext1 to Wait branch and other 7 to Notify.
>>> > In Wait branch, you need 7 Wait processors.
>>> >
>>> > It would fast to debug if you can share your flow template..
>>> >
>>> > Thanks,
>>> > Koji
>>> >
>>> > On Thu, May 31, 2018 at 3:15 PM, Martijn Dekkers
>>> > <ma...@dekkers.org.uk> wrote:
>>> >> Thank you Koji,
>>> >>
>>> >> I have tried once again, using your updated example. Unfortunately,
>>> >> things
>>> >> still get stuck at the first Wait processors' wait queue.
>>> >> I did notice that the format of the files your example generates is
>>> >> different. I will try to clarify:
>>> >>
>>> >> - 8 files in total:
>>> >>
>>> >> -- file_123_type1.ext1
>>> >> -- file_123_type1.ext2
>>> >>
>>> >> -- file_123_type2.ext1
>>> >> -- file_123_type2.ext2
>>> >>
>>> >> -- file_123_type3.ext1
>>> >> -- file_123_type3.ext2
>>> >>
>>> >> -- file_123_type4.ext1
>>> >> -- file_123_type4.ext2
>>> >>
>>> >> For each set of 8 files, "file_123" increments, so the first set of 8
>>> >> is
>>> >> "file_123", and the next set is "file_124" and so on.
>>> >>
>>> >> When I look at your example, I notice that at the final step
>>> >> (LogAttribute
>>> >> after the FetchFile set) the filenames are file_123_<incrementing
>>> >> number>.ex(1|2)
>>> >>
>>> >> My UpdateAttribute before the Notify branch is configured as:
>>> >> groupID - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$1')}
>>> >> counterName -
>>> >> ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$2')}
>>> >>
>>> >> The UpdateAttribute before the Wait branch is configured as:
>>> >> groupID - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$1')}
>>> >>
>>> >> The 4 Wait processors in the Wait branch are configured as:
>>> >> All Wait processors:
>>> >> Release Signal Identifier - ${groupID}
>>> >>
>>> >> For each individual Wait processor:
>>> >> Signal Counter Name - type1
>>> >> Signal Counter Name - type2
>>> >> Signal Counter Name - type3
>>> >> Signal Counter Name - type4
>>> >>
>>> >> I am a bit stumped. The best success we had was a configuration with a
>>> >> RouteonAttribute sending each of type1|type2|type3|type4 to their own
>>> >> Wait
>>> >> processor, and a similar config for the Notify branch, followed by a
>>> >> final
>>> >> Wait/Notify pair that simply ensures we have the correct amount of
>>> >> sets.
>>> >>
>>> >> This configuration did exactly what we want, but unfortunately we had
>>> >> random
>>> >> flowfiles stuck in the waitqueue for no apparent reason.
>>> >>
>>> >> Thanks,
>>> >>
>>> >> Martijn
>>> >>
>>> >>
>>> >>
>>> >> On 31 May 2018 at 05:23, Koji Kawamura <ij...@gmail.com> wrote:
>>> >>>
>>> >>> The order of arrival does not matter.
>>> >>>
>>> >>> Wait processor has 'Expiration Duration' configuration, defaults to
>>> >>> 10
>>> >>> min. Please adjust it according to your needs, the longest period to
>>> >>> wait for a delayed file.
>>> >>> If a FlowFile exceeds the duration, it will be sent to 'expired'
>>> >>> relationship, and can be treated differently, e.g. write ERROR log
>>> >>>
>>> >>> > If we have a longer wait for a file, we'd like processing for the
>>> >>> > next
>>> >>> > groupid to still be able to continue.
>>> >>>
>>> >>> In order to achieve that, use Wait Mode = 'Transfer to wait
>>> >>> relationship', and the 'wait' relationship should be configured to
>>> >>> use
>>> >>> FirstInFirstOutPrioritizer.
>>> >>> If FirstInFirstOutPrioritizer is not set, the same FlowFile will be
>>> >>> processed again while it blocks other FlowFiles.
>>> >>> With FirstInFirstOutPrioritizer, the processed FlowFile will be
>>> >>> re-queued at the end of wait queue.
>>> >>>
>>> >>> I've updated my example to make it more realistic, by adding delay
>>> >>> for
>>> >>> certain set and type.
>>> >>> https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd
>>> >>>
>>> >>> Thanks,
>>> >>> Koji
>>> >>>
>>> >>> On Thu, May 31, 2018 at 10:56 AM, Martijn Dekkers
>>> >>> <ma...@dekkers.org.uk> wrote:
>>> >>> > Cool, that will make things a lot simpler. Does it matter that the
>>> >>> > ext2
>>> >>> > files arrive in random order? Sometimes there can be a very long
>>> >>> > delay
>>> >>> > in
>>> >>> > some of them showing up, and we have some concerns about the
>>> >>> > overall
>>> >>> > flow
>>> >>> > blocking. If we have a longer wait for a file, we'd like processing
>>> >>> > for
>>> >>> > the
>>> >>> > next groupid to still be able to continue.
>>> >>> >
>>> >>> > Thank you for your help (and for writing Wait/Notify!!)
>>> >>> >
>>> >>> > Martijn
>>> >>> >
>>> >>> > On 31 May 2018 at 03:49, Koji Kawamura <ij...@gmail.com>
>>> >>> > wrote:
>>> >>> >>
>>> >>> >> Glad to hear that was helpful.
>>> >>> >>
>>> >>> >> "4 same type for each extension", can be treated as "8 distinct
>>> >>> >> types"
>>> >>> >> if an extension is included in a type.
>>> >>> >> ab.ex1, cd.ex1, ef.ex1, gh.ex1, ab.ex2, cd.ex2, ef.ex2, gh.ex2
>>> >>> >>
>>> >>> >> Then route only 'ab.ex1' (or whichever but just 1 of them) to the
>>> >>> >> Wait
>>> >>> >> branch, and the rest to Notify branch.
>>> >>> >> That will simplify the flow, if I'm not missing any other
>>> >>> >> requirement.
>>> >>> >>
>>> >>> >> Thanks!
>>> >>> >> Koji
>>> >>> >>
>>> >>> >> On Thu, May 31, 2018 at 10:30 AM, Martijn Dekkers
>>> >>> >> <ma...@dekkers.org.uk> wrote:
>>> >>> >> > Hi Koji, Many thanks for your continued assistance!
>>> >>> >> >
>>> >>> >> >>
>>> >>> >> >> - 1 file per second is relatively low in terms of traffic, it
>>> >>> >> >> should
>>> >>> >> >> be processed fine with 1 thread
>>> >>> >> >> - A flow like this, which is stateful across different parts of
>>> >>> >> >> the
>>> >>> >> >> flow works at best with single thread, because using multiple
>>> >>> >> >> threads
>>> >>> >> >> would cause race condition or concurrency issue if there's any
>>> >>> >> >> implementation error
>>> >>> >> >
>>> >>> >> >
>>> >>> >> > Yes, we had similar thoughts.
>>> >>> >> >
>>> >>> >> >>
>>> >>> >> >> - Based on above, I strongly recommend to NOT increase
>>> >>> >> >> "concurrent
>>> >>> >> >> tasks". If you see FlowFiles staying in a wait queue, then
>>> >>> >> >> there
>>> >>> >> >> must
>>> >>> >> >> be different issue
>>> >>> >> >
>>> >>> >> >
>>> >>> >> > We don't see many flowfiles stuck in a wait queue, I ran a test
>>> >>> >> > over
>>> >>> >> > a
>>> >>> >> > few
>>> >>> >> > hours yesterday that simulates the way in which these files
>>> >>> >> > would
>>> >>> >> > appear
>>> >>> >> > (we
>>> >>> >> > would have 4 of "ext1" show up every second, and the "ext2" can
>>> >>> >> > show
>>> >>> >> > up
>>> >>> >> > a
>>> >>> >> > few seconds later, and not always in the same order) and we
>>> >>> >> > found
>>> >>> >> > perhaps 6
>>> >>> >> > flowfiles stuck in a wait queue.
>>> >>> >> >
>>> >>> >> >>
>>> >>> >> >> - Also, using concurrent tasks number like 400 is too much in
>>> >>> >> >> general
>>> >>> >> >> for all processors. I recommend to increment it as 2, 3, 4 ..
>>> >>> >> >> up to
>>> >>> >> >> 8
>>> >>> >> >> or so, only if you see the clear benefit by doing so
>>> >>> >> >
>>> >>> >> >
>>> >>> >> > Indeed, thanks for the suggestion. Once we have the logic
>>> >>> >> > finished
>>> >>> >> > and
>>> >>> >> > tested we will have to optimise this Flow. The next step is to
>>> >>> >> > try to
>>> >>> >> > load
>>> >>> >> > the required processors into MiNiFy, as this will be running on
>>> >>> >> > many
>>> >>> >> > systems
>>> >>> >> > with limited capacity. If we don't manage with MiNiFy, we will
>>> >>> >> > still
>>> >>> >> > be
>>> >>> >> > good, but we prefer to have the smaller footprint and ease of
>>> >>> >> > management
>>> >>> >> > we
>>> >>> >> > can obtain with MiNiFy.
>>> >>> >> >
>>> >>> >> >>
>>> >>> >> >> - The important part of this flow is extracting 'groupId' and
>>> >>> >> >> 'type'
>>> >>> >> >> from file names. Regular Expression needs to be configured
>>> >>> >> >> properly.
>>> >>> >> >> - I recommend using https://regex101.com/ to test your Regular
>>> >>> >> >> Expression to see whether it can extract correct groupId and
>>> >>> >> >> type
>>> >>> >> >
>>> >>> >> >
>>> >>> >> > Yes, we have tested our RegExes for this extensively
>>> >>> >> >
>>> >>> >> >>
>>> >>> >> >>
>>> >>> >> >> Lastly, regardless of how many files should be there for 'ext1'
>>> >>> >> >> and
>>> >>> >> >> 'ext2', the flow structure is simple as below.
>>> >>> >> >> Let's say there should be 8 files to start processing those.
>>> >>> >> >> 4 x ex1, and 4 ex2 in your case, but let's think it as 8 file
>>> >>> >> >> types.
>>> >>> >> >> And I assume the types are known, meaning, static, not
>>> >>> >> >> dynamically
>>> >>> >> >> change.
>>> >>> >> >
>>> >>> >> >
>>> >>> >> > Correct, the format is <groupID><type>.<ext> where:
>>> >>> >> >
>>> >>> >> > - groupId is unique for each set of 8
>>> >>> >> > - type has 4 variants (ab, cd, ef, gh), the same type-set for
>>> >>> >> > each
>>> >>> >> > ext
>>> >>> >> >
>>> >>> >> >> So, the rule is, "a set of files consists of 8 files, and a set
>>> >>> >> >> should
>>> >>> >> >> wait to be processed until all 8 files are ready", that's all.
>>> >>> >> >
>>> >>> >> >
>>> >>> >> > For our use case it is important that we have positive
>>> >>> >> > identification
>>> >>> >> > that
>>> >>> >> > we have exact "positive identification" of each file.
>>> >>> >> >
>>> >>> >> >>
>>> >>> >> >> Then, the flow should be designed like below:
>>> >>> >> >> 1. List files, each file will be sent as a FlowFile
>>> >>> >> >
>>> >>> >> >
>>> >>> >> > Correct - we have several different listfiles for other sections
>>> >>> >> > of
>>> >>> >> > the
>>> >>> >> > flow, we are actually collecting many different sets, all
>>> >>> >> > variants of
>>> >>> >> > the
>>> >>> >> > above. However, those are far simpler (sets of 2 - ext1 and ext2
>>> >>> >> > only)
>>> >>> >> >
>>> >>> >> >>
>>> >>> >> >> 2. Extract groupId and type from filename
>>> >>> >> >
>>> >>> >> >
>>> >>> >> > Correct
>>> >>> >> >
>>> >>> >> >>
>>> >>> >> >> 3. Route FlowFiles into two branches, let's call these 'Notify'
>>> >>> >> >> branch
>>> >>> >> >> and 'Wait' branch, and pass only 1 type for a set to
>>> >>> >> >> Wait-branch,
>>> >>> >> >> and
>>> >>> >> >> the rest 7 types to Notify-branch
>>> >>> >> >
>>> >>> >> >
>>> >>> >> > OK, we currently split Notify branch to "all ext1" and wait
>>> >>> >> > branch to
>>> >>> >> > "all
>>> >>> >> > ext2"
>>> >>> >> >
>>> >>> >> >>
>>> >>> >> >> At Notify branch (for the rest 7 types FlowFile, e.g. type 2,
>>> >>> >> >> 3, 4
>>> >>> >> >> ...
>>> >>> >> >> 8)
>>> >>> >> >
>>> >>> >> >
>>> >>> >> > As mentioned, we only have 4 distinct types.
>>> >>> >> >
>>> >>> >> >>
>>> >>> >> >> 1. Notify that the type for a group has arrived.
>>> >>> >> >> 2. Discard the FlowFile, because there's nothing to do with it
>>> >>> >> >> in
>>> >>> >> >> this
>>> >>> >> >> branch
>>> >>> >> >
>>> >>> >> >
>>> >>> >> >
>>> >>> >> >>
>>> >>> >> >> At Wait branch (for the type 1 FlowFile)
>>> >>> >> >> 1. Wait for type 2 for the groupId.
>>> >>> >> >> 2. Wait for type 3 for the groupId, type 4, 5 and so on
>>> >>> >> >> 3. After passing Wait for type 8, it can guarantee that all 8
>>> >>> >> >> files
>>> >>> >> >> are available (unless there is any other program deleting
>>> >>> >> >> those)
>>> >>> >> >> 4. Get actual file content using FetchFile, and process it
>>> >>> >> >
>>> >>> >> >
>>> >>> >> > Besides the "4 same types for each extension", this is
>>> >>> >> > configured as
>>> >>> >> > you
>>> >>> >> > describe.
>>> >>> >> >
>>> >>> >> >>
>>> >>> >> >> I hope it helps.
>>> >>> >> >>
>>> >>> >> >
>>> >>> >> > It does, thanks. I will extract this portion of the flow,
>>> >>> >> > sanitise,
>>> >>> >> > and
>>> >>> >> > send
>>> >>> >> > it along - easier to see than to describe :)
>>> >>> >> >
>>> >>> >> >
>>> >>> >> >>
>>> >>> >> >> Thanks,
>>> >>> >> >> Koji
>>> >>> >> >
>>> >>> >> >
>>> >>> >> > Thank you so much once again!
>>> >>> >> >
>>> >>> >> > Martijn
>>> >>> >> >
>>> >>> >> >
>>> >>> >> >
>>> >>> >> >>
>>> >>> >> >>
>>> >>> >> >> On Wed, May 30, 2018 at 6:10 PM, Martijn Dekkers
>>> >>> >> >> <ma...@dekkers.org.uk>
>>> >>> >> >> wrote:
>>> >>> >> >> > Hey Pierre,
>>> >>> >> >> >
>>> >>> >> >> > Yes, we suspected as much, but we are only seeing this with
>>> >>> >> >> > the
>>> >>> >> >> > Wait
>>> >>> >> >> > processor. Possibly because that is the only "blocking" we
>>> >>> >> >> > have in
>>> >>> >> >> > this
>>> >>> >> >> > flow.
>>> >>> >> >> >
>>> >>> >> >> > Thanks for the clarification, much appreciated!
>>> >>> >> >> >
>>> >>> >> >> > Martijn
>>> >>> >> >> >
>>> >>> >> >> > On 30 May 2018 at 10:30, Pierre Villard
>>> >>> >> >> > <pi...@gmail.com>
>>> >>> >> >> > wrote:
>>> >>> >> >> >>
>>> >>> >> >> >> I'll let Koji give more information about the Wait/Notify,
>>> >>> >> >> >> he is
>>> >>> >> >> >> clearly
>>> >>> >> >> >> the expert here.
>>> >>> >> >> >>
>>> >>> >> >> >> I'm just jumping in regarding your "and when viewing the
>>> >>> >> >> >> queue,
>>> >>> >> >> >> the
>>> >>> >> >> >> dialog
>>> >>> >> >> >> states that the queue is empty.". You're seeing this
>>> >>> >> >> >> behavior
>>> >>> >> >> >> because,
>>> >>> >> >> >> even
>>> >>> >> >> >> though the UI shows some flow files in the queue, the flow
>>> >>> >> >> >> files
>>> >>> >> >> >> are
>>> >>> >> >> >> currently locked in the session of the running processor and
>>> >>> >> >> >> you
>>> >>> >> >> >> won't
>>> >>> >> >> >> see
>>> >>> >> >> >> flow files currently processed in a session when listing a
>>> >>> >> >> >> queue.
>>> >>> >> >> >> If
>>> >>> >> >> >> you
>>> >>> >> >> >> stop the processor, the session will be closed and you'll be
>>> >>> >> >> >> able
>>> >>> >> >> >> to
>>> >>> >> >> >> list
>>> >>> >> >> >> the queue and see the flow files.
>>> >>> >> >> >>
>>> >>> >> >> >> I recall discussions in the past to improve the UX for this.
>>> >>> >> >> >> Not
>>> >>> >> >> >> sure
>>> >>> >> >> >> we
>>> >>> >> >> >> have a JIRA for it though...
>>> >>> >> >> >>
>>> >>> >> >> >> Pierre
>>> >>> >> >> >>
>>> >>> >> >> >> 2018-05-30 8:26 GMT+02:00 Martijn Dekkers
>>> >>> >> >> >> <ma...@dekkers.org.uk>:
>>> >>> >> >> >>>
>>> >>> >> >> >>> Hi Koji,
>>> >>> >> >> >>>
>>> >>> >> >> >>> Thank you for responding. I had adjusted the run schedule
>>> >>> >> >> >>> to
>>> >>> >> >> >>> closely
>>> >>> >> >> >>> mimic our environment. We are expecting about 1 file per
>>> >>> >> >> >>> second
>>> >>> >> >> >>> or
>>> >>> >> >> >>> so.
>>> >>> >> >> >>> We are also seeing some random "orphans" sitting in a wait
>>> >>> >> >> >>> queue
>>> >>> >> >> >>> every
>>> >>> >> >> >>> now and again that don't trigger a debug message, and when
>>> >>> >> >> >>> viewing
>>> >>> >> >> >>> the
>>> >>> >> >> >>> queue, the dialog states that the queue is empty.
>>> >>> >> >> >>>
>>> >>> >> >> >>> We found the random "no signal found" issue to be
>>> >>> >> >> >>> significantly
>>> >>> >> >> >>> decreased
>>> >>> >> >> >>> when we increase the "concurrent tasks" to something large
>>> >>> >> >> >>> -
>>> >>> >> >> >>> currently
>>> >>> >> >> >>> set
>>> >>> >> >> >>> to 400 for all wait and notify processors.
>>> >>> >> >> >>>
>>> >>> >> >> >>> I do need to mention that our requirements had changed
>>> >>> >> >> >>> since you
>>> >>> >> >> >>> made
>>> >>> >> >> >>> the
>>> >>> >> >> >>> template, in that we are looking for a set of 8 files - 4 x
>>> >>> >> >> >>> "ext1"
>>> >>> >> >> >>> and
>>> >>> >> >> >>> 4 x
>>> >>> >> >> >>> "ext2" both with the same pattern: <groupid><type (4 of
>>> >>> >> >> >>> these)>.ext1
>>> >>> >> >> >>> or ext2
>>> >>> >> >> >>>
>>> >>> >> >> >>> We found that the best way to make this work was to add
>>> >>> >> >> >>> another
>>> >>> >> >> >>> wait/notify pair, each processor coming after the ones
>>> >>> >> >> >>> already
>>> >>> >> >> >>> there,
>>> >>> >> >> >>> with a
>>> >>> >> >> >>> simple counter against the groupID.
>>> >>> >> >> >>>
>>> >>> >> >> >>> I will export a template for you, many thanks for your help
>>> >>> >> >> >>> - I
>>> >>> >> >> >>> just
>>> >>> >> >> >>> need
>>> >>> >> >> >>> to spend some time sanitising the varies fields etc.
>>> >>> >> >> >>>
>>> >>> >> >> >>> Many thanks once again for your kind assistance.
>>> >>> >> >> >>>
>>> >>> >> >> >>> Martijn
>>> >>> >> >> >>>
>>> >>> >> >> >>> On 30 May 2018 at 08:14, Koji Kawamura
>>> >>> >> >> >>> <ij...@gmail.com>
>>> >>> >> >> >>> wrote:
>>> >>> >> >> >>>>
>>> >>> >> >> >>>> Hi Martjin,
>>> >>> >> >> >>>>
>>> >>> >> >> >>>> In my template, I was using 'Run Schedule' as '5 secs' for
>>> >>> >> >> >>>> the
>>> >>> >> >> >>>> Wait
>>> >>> >> >> >>>> processors to avoid overusing CPU resource. However, if
>>> >>> >> >> >>>> you
>>> >>> >> >> >>>> expect
>>> >>> >> >> >>>> more throughput, it should be lowered.
>>> >>> >> >> >>>> Changed Run Schedule to 0 sec, and I passed 100 group of
>>> >>> >> >> >>>> files
>>> >>> >> >> >>>> (400
>>> >>> >> >> >>>> files because 4 files are 1 set in my example), they
>>> >>> >> >> >>>> reached to
>>> >>> >> >> >>>> the
>>> >>> >> >> >>>> expected goal of the flow without issue.
>>> >>> >> >> >>>>
>>> >>> >> >> >>>> If you can share your flow and example input file volume
>>> >>> >> >> >>>> (hundreds
>>> >>> >> >> >>>> of
>>> >>> >> >> >>>> files were fine in my flow), I may be able to provide more
>>> >>> >> >> >>>> useful
>>> >>> >> >> >>>> comment.
>>> >>> >> >> >>>>
>>> >>> >> >> >>>> Thanks,
>>> >>> >> >> >>>> Koji
>>> >>> >> >> >>>>
>>> >>> >> >> >>>> On Wed, May 30, 2018 at 2:08 PM, Martijn Dekkers
>>> >>> >> >> >>>> <ma...@dekkers.org.uk> wrote:
>>> >>> >> >> >>>> > Hi Koji,
>>> >>> >> >> >>>> >
>>> >>> >> >> >>>> > I am seeing many issues to get this to run reliably.
>>> >>> >> >> >>>> > When
>>> >>> >> >> >>>> > running
>>> >>> >> >> >>>> > this
>>> >>> >> >> >>>> > with
>>> >>> >> >> >>>> > a few flowfiles at a time, and stepping through by
>>> >>> >> >> >>>> > switching
>>> >>> >> >> >>>> > processors on
>>> >>> >> >> >>>> > and off it works mostly fine, but running this at volume
>>> >>> >> >> >>>> > I
>>> >>> >> >> >>>> > receive
>>> >>> >> >> >>>> > many
>>> >>> >> >> >>>> > errors about "no release signal found"
>>> >>> >> >> >>>> >
>>> >>> >> >> >>>> > I have tried to fix this in a few different ways, but
>>> >>> >> >> >>>> > the
>>> >>> >> >> >>>> > issue
>>> >>> >> >> >>>> > keeps
>>> >>> >> >> >>>> > coming
>>> >>> >> >> >>>> > back. This is also not consistent at all - different
>>> >>> >> >> >>>> > wait
>>> >>> >> >> >>>> > processors
>>> >>> >> >> >>>> > will
>>> >>> >> >> >>>> > block different flowfiles at different times, without
>>> >>> >> >> >>>> > changing
>>> >>> >> >> >>>> > any
>>> >>> >> >> >>>> > configuration. Stop/Start the flow, and different queues
>>> >>> >> >> >>>> > will
>>> >>> >> >> >>>> > fill
>>> >>> >> >> >>>> > up.
>>> >>> >> >> >>>> > Do
>>> >>> >> >> >>>> > you have any ideas what could be causing this behavior?
>>> >>> >> >> >>>> > I
>>> >>> >> >> >>>> > checked
>>> >>> >> >> >>>> > the
>>> >>> >> >> >>>> > DistributedMapCache Server/Client components, and they
>>> >>> >> >> >>>> > all
>>> >>> >> >> >>>> > appear
>>> >>> >> >> >>>> > to
>>> >>> >> >> >>>> > be
>>> >>> >> >> >>>> > working OK.
>>> >>> >> >> >>>> >
>>> >>> >> >> >>>> > Thanks,
>>> >>> >> >> >>>> >
>>> >>> >> >> >>>> > Martijn
>>> >>> >> >> >>>> >
>>> >>> >> >> >>>> > On 28 May 2018 at 05:11, Koji Kawamura
>>> >>> >> >> >>>> > <ij...@gmail.com>
>>> >>> >> >> >>>> > wrote:
>>> >>> >> >> >>>> >>
>>> >>> >> >> >>>> >> Hi Martin,
>>> >>> >> >> >>>> >>
>>> >>> >> >> >>>> >> Alternative approach is using Wait/Notify processors.
>>> >>> >> >> >>>> >> I have developed similar flow using those before, and
>>> >>> >> >> >>>> >> it
>>> >>> >> >> >>>> >> will
>>> >>> >> >> >>>> >> work
>>> >>> >> >> >>>> >> with your case I believe.
>>> >>> >> >> >>>> >> A NiFi flow template is available here.
>>> >>> >> >> >>>> >>
>>> >>> >> >> >>>> >>
>>> >>> >> >> >>>> >>
>>> >>> >> >> >>>> >>
>>> >>> >> >> >>>> >> https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd
>>> >>> >> >> >>>> >>
>>> >>> >> >> >>>> >> Hope this helps,
>>> >>> >> >> >>>> >> Koji
>>> >>> >> >> >>>> >>
>>> >>> >> >> >>>> >>
>>> >>> >> >> >>>> >> On Sun, May 27, 2018 at 11:48 PM, Andrew Grande
>>> >>> >> >> >>>> >> <ap...@gmail.com>
>>> >>> >> >> >>>> >> wrote:
>>> >>> >> >> >>>> >> > Martijn,
>>> >>> >> >> >>>> >> >
>>> >>> >> >> >>>> >> > Here's an idea you could explore. Have the ListFile
>>> >>> >> >> >>>> >> > processor
>>> >>> >> >> >>>> >> > work
>>> >>> >> >> >>>> >> > as
>>> >>> >> >> >>>> >> > usual
>>> >>> >> >> >>>> >> > and create a custom component (start with a scripting
>>> >>> >> >> >>>> >> > one
>>> >>> >> >> >>>> >> > to
>>> >>> >> >> >>>> >> > prototype)
>>> >>> >> >> >>>> >> > grouping the filenames as needed. I don't know of the
>>> >>> >> >> >>>> >> > number
>>> >>> >> >> >>>> >> > of
>>> >>> >> >> >>>> >> > files in
>>> >>> >> >> >>>> >> > a
>>> >>> >> >> >>>> >> > set is different every time, so trying to be more
>>> >>> >> >> >>>> >> > robust.
>>> >>> >> >> >>>> >> >
>>> >>> >> >> >>>> >> > Once you group and count the set, you can transfer
>>> >>> >> >> >>>> >> > the
>>> >>> >> >> >>>> >> > names
>>> >>> >> >> >>>> >> > to
>>> >>> >> >> >>>> >> > the
>>> >>> >> >> >>>> >> > success
>>> >>> >> >> >>>> >> > relationship. Ignore otherwise and wait until the set
>>> >>> >> >> >>>> >> > is
>>> >>> >> >> >>>> >> > full.
>>> >>> >> >> >>>> >> >
>>> >>> >> >> >>>> >> > Andrew
>>> >>> >> >> >>>> >> >
>>> >>> >> >> >>>> >> >
>>> >>> >> >> >>>> >> > On Sun, May 27, 2018, 7:29 AM Martijn Dekkers
>>> >>> >> >> >>>> >> > <ma...@dekkers.org.uk>
>>> >>> >> >> >>>> >> > wrote:
>>> >>> >> >> >>>> >> >>
>>> >>> >> >> >>>> >> >> Hello all,
>>> >>> >> >> >>>> >> >>
>>> >>> >> >> >>>> >> >> I am trying to work out an issue with little
>>> >>> >> >> >>>> >> >> success.
>>> >>> >> >> >>>> >> >>
>>> >>> >> >> >>>> >> >> I need to ingest files generated by some
>>> >>> >> >> >>>> >> >> application. I
>>> >>> >> >> >>>> >> >> can
>>> >>> >> >> >>>> >> >> only
>>> >>> >> >> >>>> >> >> ingest
>>> >>> >> >> >>>> >> >> these files when a specific set exists. For example:
>>> >>> >> >> >>>> >> >>
>>> >>> >> >> >>>> >> >> file_123_456_ab.ex1
>>> >>> >> >> >>>> >> >> file_123_456_cd.ex1
>>> >>> >> >> >>>> >> >> file_123_456_ef.ex1
>>> >>> >> >> >>>> >> >> file_123_456_gh.ex1
>>> >>> >> >> >>>> >> >> file_123_456.ex2
>>> >>> >> >> >>>> >> >>
>>> >>> >> >> >>>> >> >> Only when a set like that exists should I pick them
>>> >>> >> >> >>>> >> >> up
>>> >>> >> >> >>>> >> >> into
>>> >>> >> >> >>>> >> >> the
>>> >>> >> >> >>>> >> >> Flow.
>>> >>> >> >> >>>> >> >> The
>>> >>> >> >> >>>> >> >> parts I am looking for to "group" would "ab.ex1",
>>> >>> >> >> >>>> >> >> "cd.ex1",
>>> >>> >> >> >>>> >> >> "ef.ex1",
>>> >>> >> >> >>>> >> >> "gh.ex1", ".ex2".
>>> >>> >> >> >>>> >> >>
>>> >>> >> >> >>>> >> >> I tried to do this with some expression, but
>>> >>> >> >> >>>> >> >> couldn't
>>> >>> >> >> >>>> >> >> work
>>> >>> >> >> >>>> >> >> it
>>> >>> >> >> >>>> >> >> out.
>>> >>> >> >> >>>> >> >>
>>> >>> >> >> >>>> >> >> What would be the best way to achieve this?
>>> >>> >> >> >>>> >> >>
>>> >>> >> >> >>>> >> >> Many thanks!
>>> >>> >> >> >>>> >
>>> >>> >> >> >>>> >
>>> >>> >> >> >>>
>>> >>> >> >> >>>
>>> >>> >> >> >>
>>> >>> >> >> >
>>> >>> >> >
>>> >>> >> >
>>> >>> >
>>> >>> >
>>> >>
>>> >>
>>
>>
>

Re: Only get file when a set exists.

Posted by Martijn Dekkers <ma...@dekkers.org.uk>.
Hi Koji,

Some more information from debugging.

I have today deployed Redis since that gives me an easy interface to check
the existence of keys, and found that for files that end up stuck in the
wait queues, the provenance in the Notify queue shows the relevant flowfile
as having arrived, but the relevant key in Redis shows as (nil)

Files that have been processed successfully show a "good" key in Redis.

Thanks,

Martijn

On 5 June 2018 at 06:27, Martijn Dekkers <ma...@dekkers.org.uk> wrote:

> Hello Koji,
>
> Many thanks, apologies for the delay in responding - I had to work on some
> different tasks.
>
> I have followed your advice and have configured a flow accordingly, and on
> the whole the logic works. However, I still see the issue where a set will
> be stuck in the wait queue. I have tracked it down to the instance where
> there is a longer delay between the arrival of ext1 and ext2 files. If I
> pause the appropriate processor that gates the ext2 files, that set will
> get stuck. If all files come through roughly at a similar time, we see no
> issues, and the flow happily runs.
>
> I am not quite sure about the best way to debug this. I have looked at the
> attributes in provenance, and notice that the relevant counter for the
> specific wait processor isn't updated. I am not sure how I can check the
> status of the distributed map cache to see if this might be responsible.
>
> I can share my flowfile, but would have to email it to you directly,
> unfortunately I cannot share the flowfile publicly, and sanitising it to
> the extent that I can publicly share it would be difficult.
>
> Oh, we are using 1.6
>
> Many thanks,
>
> Martijn
>
> On 31 May 2018 at 09:57, Koji Kawamura <ij...@gmail.com> wrote:
>
>> BTW, which version are you using? I hope it is 1.4.0 or higher. There
>> was an issue having effects to your usage.
>> https://issues.apache.org/jira/browse/NIFI-4028
>>
>> On Thu, May 31, 2018 at 4:51 PM, Koji Kawamura <ij...@gmail.com>
>> wrote:
>> > HI Martijn,
>> >
>> > I used the filename increment pattern based on your first filename
>> example.
>> > file_123_456_ab.ex1
>> > I increment the 456 part. If it changed, that's fine.
>> >
>> > Your current configurations look like below:
>> > - Given a filename: file_123_type3.ext1
>> > - matched with a RegEx: ^file_(\d{3}_)(\w+)\.ext1$
>> > - groupID will be: 123_ (including the underscore)
>> > - counterName will be: type3
>> >
>> > I was suggesting include the extension to the counterName.
>> > How about changing the RegEx as:
>> > - RegEx: ^file_(\d+)_(\w+\.ext\d)$
>> > - groupID will be: 123
>> > - counterName will be: type3.ext1
>> >
>> > Then you can route type1.ext1 to Wait branch and other 7 to Notify.
>> > In Wait branch, you need 7 Wait processors.
>> >
>> > It would fast to debug if you can share your flow template..
>> >
>> > Thanks,
>> > Koji
>> >
>> > On Thu, May 31, 2018 at 3:15 PM, Martijn Dekkers <
>> martijn@dekkers.org.uk> wrote:
>> >> Thank you Koji,
>> >>
>> >> I have tried once again, using your updated example. Unfortunately,
>> things
>> >> still get stuck at the first Wait processors' wait queue.
>> >> I did notice that the format of the files your example generates is
>> >> different. I will try to clarify:
>> >>
>> >> - 8 files in total:
>> >>
>> >> -- file_123_type1.ext1
>> >> -- file_123_type1.ext2
>> >>
>> >> -- file_123_type2.ext1
>> >> -- file_123_type2.ext2
>> >>
>> >> -- file_123_type3.ext1
>> >> -- file_123_type3.ext2
>> >>
>> >> -- file_123_type4.ext1
>> >> -- file_123_type4.ext2
>> >>
>> >> For each set of 8 files, "file_123" increments, so the first set of 8
>> is
>> >> "file_123", and the next set is "file_124" and so on.
>> >>
>> >> When I look at your example, I notice that at the final step
>> (LogAttribute
>> >> after the FetchFile set) the filenames are file_123_<incrementing
>> >> number>.ex(1|2)
>> >>
>> >> My UpdateAttribute before the Notify branch is configured as:
>> >> groupID - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$1')}
>> >> counterName - ${fileName:replaceFirst('^file
>> _(\d{3}_)(\w+)\.ext1$','$2')}
>> >>
>> >> The UpdateAttribute before the Wait branch is configured as:
>> >> groupID - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$1')}
>> >>
>> >> The 4 Wait processors in the Wait branch are configured as:
>> >> All Wait processors:
>> >> Release Signal Identifier - ${groupID}
>> >>
>> >> For each individual Wait processor:
>> >> Signal Counter Name - type1
>> >> Signal Counter Name - type2
>> >> Signal Counter Name - type3
>> >> Signal Counter Name - type4
>> >>
>> >> I am a bit stumped. The best success we had was a configuration with a
>> >> RouteonAttribute sending each of type1|type2|type3|type4 to their own
>> Wait
>> >> processor, and a similar config for the Notify branch, followed by a
>> final
>> >> Wait/Notify pair that simply ensures we have the correct amount of
>> sets.
>> >>
>> >> This configuration did exactly what we want, but unfortunately we had
>> random
>> >> flowfiles stuck in the waitqueue for no apparent reason.
>> >>
>> >> Thanks,
>> >>
>> >> Martijn
>> >>
>> >>
>> >>
>> >> On 31 May 2018 at 05:23, Koji Kawamura <ij...@gmail.com> wrote:
>> >>>
>> >>> The order of arrival does not matter.
>> >>>
>> >>> Wait processor has 'Expiration Duration' configuration, defaults to 10
>> >>> min. Please adjust it according to your needs, the longest period to
>> >>> wait for a delayed file.
>> >>> If a FlowFile exceeds the duration, it will be sent to 'expired'
>> >>> relationship, and can be treated differently, e.g. write ERROR log
>> >>>
>> >>> > If we have a longer wait for a file, we'd like processing for the
>> next
>> >>> > groupid to still be able to continue.
>> >>>
>> >>> In order to achieve that, use Wait Mode = 'Transfer to wait
>> >>> relationship', and the 'wait' relationship should be configured to use
>> >>> FirstInFirstOutPrioritizer.
>> >>> If FirstInFirstOutPrioritizer is not set, the same FlowFile will be
>> >>> processed again while it blocks other FlowFiles.
>> >>> With FirstInFirstOutPrioritizer, the processed FlowFile will be
>> >>> re-queued at the end of wait queue.
>> >>>
>> >>> I've updated my example to make it more realistic, by adding delay for
>> >>> certain set and type.
>> >>> https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd
>> >>>
>> >>> Thanks,
>> >>> Koji
>> >>>
>> >>> On Thu, May 31, 2018 at 10:56 AM, Martijn Dekkers
>> >>> <ma...@dekkers.org.uk> wrote:
>> >>> > Cool, that will make things a lot simpler. Does it matter that the
>> ext2
>> >>> > files arrive in random order? Sometimes there can be a very long
>> delay
>> >>> > in
>> >>> > some of them showing up, and we have some concerns about the overall
>> >>> > flow
>> >>> > blocking. If we have a longer wait for a file, we'd like processing
>> for
>> >>> > the
>> >>> > next groupid to still be able to continue.
>> >>> >
>> >>> > Thank you for your help (and for writing Wait/Notify!!)
>> >>> >
>> >>> > Martijn
>> >>> >
>> >>> > On 31 May 2018 at 03:49, Koji Kawamura <ij...@gmail.com>
>> wrote:
>> >>> >>
>> >>> >> Glad to hear that was helpful.
>> >>> >>
>> >>> >> "4 same type for each extension", can be treated as "8 distinct
>> types"
>> >>> >> if an extension is included in a type.
>> >>> >> ab.ex1, cd.ex1, ef.ex1, gh.ex1, ab.ex2, cd.ex2, ef.ex2, gh.ex2
>> >>> >>
>> >>> >> Then route only 'ab.ex1' (or whichever but just 1 of them) to the
>> Wait
>> >>> >> branch, and the rest to Notify branch.
>> >>> >> That will simplify the flow, if I'm not missing any other
>> requirement.
>> >>> >>
>> >>> >> Thanks!
>> >>> >> Koji
>> >>> >>
>> >>> >> On Thu, May 31, 2018 at 10:30 AM, Martijn Dekkers
>> >>> >> <ma...@dekkers.org.uk> wrote:
>> >>> >> > Hi Koji, Many thanks for your continued assistance!
>> >>> >> >
>> >>> >> >>
>> >>> >> >> - 1 file per second is relatively low in terms of traffic, it
>> should
>> >>> >> >> be processed fine with 1 thread
>> >>> >> >> - A flow like this, which is stateful across different parts of
>> the
>> >>> >> >> flow works at best with single thread, because using multiple
>> >>> >> >> threads
>> >>> >> >> would cause race condition or concurrency issue if there's any
>> >>> >> >> implementation error
>> >>> >> >
>> >>> >> >
>> >>> >> > Yes, we had similar thoughts.
>> >>> >> >
>> >>> >> >>
>> >>> >> >> - Based on above, I strongly recommend to NOT increase
>> "concurrent
>> >>> >> >> tasks". If you see FlowFiles staying in a wait queue, then there
>> >>> >> >> must
>> >>> >> >> be different issue
>> >>> >> >
>> >>> >> >
>> >>> >> > We don't see many flowfiles stuck in a wait queue, I ran a test
>> over
>> >>> >> > a
>> >>> >> > few
>> >>> >> > hours yesterday that simulates the way in which these files would
>> >>> >> > appear
>> >>> >> > (we
>> >>> >> > would have 4 of "ext1" show up every second, and the "ext2" can
>> show
>> >>> >> > up
>> >>> >> > a
>> >>> >> > few seconds later, and not always in the same order) and we found
>> >>> >> > perhaps 6
>> >>> >> > flowfiles stuck in a wait queue.
>> >>> >> >
>> >>> >> >>
>> >>> >> >> - Also, using concurrent tasks number like 400 is too much in
>> >>> >> >> general
>> >>> >> >> for all processors. I recommend to increment it as 2, 3, 4 ..
>> up to
>> >>> >> >> 8
>> >>> >> >> or so, only if you see the clear benefit by doing so
>> >>> >> >
>> >>> >> >
>> >>> >> > Indeed, thanks for the suggestion. Once we have the logic
>> finished
>> >>> >> > and
>> >>> >> > tested we will have to optimise this Flow. The next step is to
>> try to
>> >>> >> > load
>> >>> >> > the required processors into MiNiFy, as this will be running on
>> many
>> >>> >> > systems
>> >>> >> > with limited capacity. If we don't manage with MiNiFy, we will
>> still
>> >>> >> > be
>> >>> >> > good, but we prefer to have the smaller footprint and ease of
>> >>> >> > management
>> >>> >> > we
>> >>> >> > can obtain with MiNiFy.
>> >>> >> >
>> >>> >> >>
>> >>> >> >> - The important part of this flow is extracting 'groupId' and
>> 'type'
>> >>> >> >> from file names. Regular Expression needs to be configured
>> properly.
>> >>> >> >> - I recommend using https://regex101.com/ to test your Regular
>> >>> >> >> Expression to see whether it can extract correct groupId and
>> type
>> >>> >> >
>> >>> >> >
>> >>> >> > Yes, we have tested our RegExes for this extensively
>> >>> >> >
>> >>> >> >>
>> >>> >> >>
>> >>> >> >> Lastly, regardless of how many files should be there for 'ext1'
>> and
>> >>> >> >> 'ext2', the flow structure is simple as below.
>> >>> >> >> Let's say there should be 8 files to start processing those.
>> >>> >> >> 4 x ex1, and 4 ex2 in your case, but let's think it as 8 file
>> types.
>> >>> >> >> And I assume the types are known, meaning, static, not
>> dynamically
>> >>> >> >> change.
>> >>> >> >
>> >>> >> >
>> >>> >> > Correct, the format is <groupID><type>.<ext> where:
>> >>> >> >
>> >>> >> > - groupId is unique for each set of 8
>> >>> >> > - type has 4 variants (ab, cd, ef, gh), the same type-set for
>> each
>> >>> >> > ext
>> >>> >> >
>> >>> >> >> So, the rule is, "a set of files consists of 8 files, and a set
>> >>> >> >> should
>> >>> >> >> wait to be processed until all 8 files are ready", that's all.
>> >>> >> >
>> >>> >> >
>> >>> >> > For our use case it is important that we have positive
>> identification
>> >>> >> > that
>> >>> >> > we have exact "positive identification" of each file.
>> >>> >> >
>> >>> >> >>
>> >>> >> >> Then, the flow should be designed like below:
>> >>> >> >> 1. List files, each file will be sent as a FlowFile
>> >>> >> >
>> >>> >> >
>> >>> >> > Correct - we have several different listfiles for other sections
>> of
>> >>> >> > the
>> >>> >> > flow, we are actually collecting many different sets, all
>> variants of
>> >>> >> > the
>> >>> >> > above. However, those are far simpler (sets of 2 - ext1 and ext2
>> >>> >> > only)
>> >>> >> >
>> >>> >> >>
>> >>> >> >> 2. Extract groupId and type from filename
>> >>> >> >
>> >>> >> >
>> >>> >> > Correct
>> >>> >> >
>> >>> >> >>
>> >>> >> >> 3. Route FlowFiles into two branches, let's call these 'Notify'
>> >>> >> >> branch
>> >>> >> >> and 'Wait' branch, and pass only 1 type for a set to
>> Wait-branch,
>> >>> >> >> and
>> >>> >> >> the rest 7 types to Notify-branch
>> >>> >> >
>> >>> >> >
>> >>> >> > OK, we currently split Notify branch to "all ext1" and wait
>> branch to
>> >>> >> > "all
>> >>> >> > ext2"
>> >>> >> >
>> >>> >> >>
>> >>> >> >> At Notify branch (for the rest 7 types FlowFile, e.g. type 2,
>> 3, 4
>> >>> >> >> ...
>> >>> >> >> 8)
>> >>> >> >
>> >>> >> >
>> >>> >> > As mentioned, we only have 4 distinct types.
>> >>> >> >
>> >>> >> >>
>> >>> >> >> 1. Notify that the type for a group has arrived.
>> >>> >> >> 2. Discard the FlowFile, because there's nothing to do with it
>> in
>> >>> >> >> this
>> >>> >> >> branch
>> >>> >> >
>> >>> >> >
>> >>> >> >
>> >>> >> >>
>> >>> >> >> At Wait branch (for the type 1 FlowFile)
>> >>> >> >> 1. Wait for type 2 for the groupId.
>> >>> >> >> 2. Wait for type 3 for the groupId, type 4, 5 and so on
>> >>> >> >> 3. After passing Wait for type 8, it can guarantee that all 8
>> files
>> >>> >> >> are available (unless there is any other program deleting those)
>> >>> >> >> 4. Get actual file content using FetchFile, and process it
>> >>> >> >
>> >>> >> >
>> >>> >> > Besides the "4 same types for each extension", this is
>> configured as
>> >>> >> > you
>> >>> >> > describe.
>> >>> >> >
>> >>> >> >>
>> >>> >> >> I hope it helps.
>> >>> >> >>
>> >>> >> >
>> >>> >> > It does, thanks. I will extract this portion of the flow,
>> sanitise,
>> >>> >> > and
>> >>> >> > send
>> >>> >> > it along - easier to see than to describe :)
>> >>> >> >
>> >>> >> >
>> >>> >> >>
>> >>> >> >> Thanks,
>> >>> >> >> Koji
>> >>> >> >
>> >>> >> >
>> >>> >> > Thank you so much once again!
>> >>> >> >
>> >>> >> > Martijn
>> >>> >> >
>> >>> >> >
>> >>> >> >
>> >>> >> >>
>> >>> >> >>
>> >>> >> >> On Wed, May 30, 2018 at 6:10 PM, Martijn Dekkers
>> >>> >> >> <ma...@dekkers.org.uk>
>> >>> >> >> wrote:
>> >>> >> >> > Hey Pierre,
>> >>> >> >> >
>> >>> >> >> > Yes, we suspected as much, but we are only seeing this with
>> the
>> >>> >> >> > Wait
>> >>> >> >> > processor. Possibly because that is the only "blocking" we
>> have in
>> >>> >> >> > this
>> >>> >> >> > flow.
>> >>> >> >> >
>> >>> >> >> > Thanks for the clarification, much appreciated!
>> >>> >> >> >
>> >>> >> >> > Martijn
>> >>> >> >> >
>> >>> >> >> > On 30 May 2018 at 10:30, Pierre Villard
>> >>> >> >> > <pi...@gmail.com>
>> >>> >> >> > wrote:
>> >>> >> >> >>
>> >>> >> >> >> I'll let Koji give more information about the Wait/Notify,
>> he is
>> >>> >> >> >> clearly
>> >>> >> >> >> the expert here.
>> >>> >> >> >>
>> >>> >> >> >> I'm just jumping in regarding your "and when viewing the
>> queue,
>> >>> >> >> >> the
>> >>> >> >> >> dialog
>> >>> >> >> >> states that the queue is empty.". You're seeing this behavior
>> >>> >> >> >> because,
>> >>> >> >> >> even
>> >>> >> >> >> though the UI shows some flow files in the queue, the flow
>> files
>> >>> >> >> >> are
>> >>> >> >> >> currently locked in the session of the running processor and
>> you
>> >>> >> >> >> won't
>> >>> >> >> >> see
>> >>> >> >> >> flow files currently processed in a session when listing a
>> queue.
>> >>> >> >> >> If
>> >>> >> >> >> you
>> >>> >> >> >> stop the processor, the session will be closed and you'll be
>> able
>> >>> >> >> >> to
>> >>> >> >> >> list
>> >>> >> >> >> the queue and see the flow files.
>> >>> >> >> >>
>> >>> >> >> >> I recall discussions in the past to improve the UX for this.
>> Not
>> >>> >> >> >> sure
>> >>> >> >> >> we
>> >>> >> >> >> have a JIRA for it though...
>> >>> >> >> >>
>> >>> >> >> >> Pierre
>> >>> >> >> >>
>> >>> >> >> >> 2018-05-30 8:26 GMT+02:00 Martijn Dekkers
>> >>> >> >> >> <ma...@dekkers.org.uk>:
>> >>> >> >> >>>
>> >>> >> >> >>> Hi Koji,
>> >>> >> >> >>>
>> >>> >> >> >>> Thank you for responding. I had adjusted the run schedule to
>> >>> >> >> >>> closely
>> >>> >> >> >>> mimic our environment. We are expecting about 1 file per
>> second
>> >>> >> >> >>> or
>> >>> >> >> >>> so.
>> >>> >> >> >>> We are also seeing some random "orphans" sitting in a wait
>> queue
>> >>> >> >> >>> every
>> >>> >> >> >>> now and again that don't trigger a debug message, and when
>> >>> >> >> >>> viewing
>> >>> >> >> >>> the
>> >>> >> >> >>> queue, the dialog states that the queue is empty.
>> >>> >> >> >>>
>> >>> >> >> >>> We found the random "no signal found" issue to be
>> significantly
>> >>> >> >> >>> decreased
>> >>> >> >> >>> when we increase the "concurrent tasks" to something large -
>> >>> >> >> >>> currently
>> >>> >> >> >>> set
>> >>> >> >> >>> to 400 for all wait and notify processors.
>> >>> >> >> >>>
>> >>> >> >> >>> I do need to mention that our requirements had changed
>> since you
>> >>> >> >> >>> made
>> >>> >> >> >>> the
>> >>> >> >> >>> template, in that we are looking for a set of 8 files - 4 x
>> >>> >> >> >>> "ext1"
>> >>> >> >> >>> and
>> >>> >> >> >>> 4 x
>> >>> >> >> >>> "ext2" both with the same pattern: <groupid><type (4 of
>> >>> >> >> >>> these)>.ext1
>> >>> >> >> >>> or ext2
>> >>> >> >> >>>
>> >>> >> >> >>> We found that the best way to make this work was to add
>> another
>> >>> >> >> >>> wait/notify pair, each processor coming after the ones
>> already
>> >>> >> >> >>> there,
>> >>> >> >> >>> with a
>> >>> >> >> >>> simple counter against the groupID.
>> >>> >> >> >>>
>> >>> >> >> >>> I will export a template for you, many thanks for your help
>> - I
>> >>> >> >> >>> just
>> >>> >> >> >>> need
>> >>> >> >> >>> to spend some time sanitising the varies fields etc.
>> >>> >> >> >>>
>> >>> >> >> >>> Many thanks once again for your kind assistance.
>> >>> >> >> >>>
>> >>> >> >> >>> Martijn
>> >>> >> >> >>>
>> >>> >> >> >>> On 30 May 2018 at 08:14, Koji Kawamura <
>> ijokarumawak@gmail.com>
>> >>> >> >> >>> wrote:
>> >>> >> >> >>>>
>> >>> >> >> >>>> Hi Martjin,
>> >>> >> >> >>>>
>> >>> >> >> >>>> In my template, I was using 'Run Schedule' as '5 secs' for
>> the
>> >>> >> >> >>>> Wait
>> >>> >> >> >>>> processors to avoid overusing CPU resource. However, if you
>> >>> >> >> >>>> expect
>> >>> >> >> >>>> more throughput, it should be lowered.
>> >>> >> >> >>>> Changed Run Schedule to 0 sec, and I passed 100 group of
>> files
>> >>> >> >> >>>> (400
>> >>> >> >> >>>> files because 4 files are 1 set in my example), they
>> reached to
>> >>> >> >> >>>> the
>> >>> >> >> >>>> expected goal of the flow without issue.
>> >>> >> >> >>>>
>> >>> >> >> >>>> If you can share your flow and example input file volume
>> >>> >> >> >>>> (hundreds
>> >>> >> >> >>>> of
>> >>> >> >> >>>> files were fine in my flow), I may be able to provide more
>> >>> >> >> >>>> useful
>> >>> >> >> >>>> comment.
>> >>> >> >> >>>>
>> >>> >> >> >>>> Thanks,
>> >>> >> >> >>>> Koji
>> >>> >> >> >>>>
>> >>> >> >> >>>> On Wed, May 30, 2018 at 2:08 PM, Martijn Dekkers
>> >>> >> >> >>>> <ma...@dekkers.org.uk> wrote:
>> >>> >> >> >>>> > Hi Koji,
>> >>> >> >> >>>> >
>> >>> >> >> >>>> > I am seeing many issues to get this to run reliably. When
>> >>> >> >> >>>> > running
>> >>> >> >> >>>> > this
>> >>> >> >> >>>> > with
>> >>> >> >> >>>> > a few flowfiles at a time, and stepping through by
>> switching
>> >>> >> >> >>>> > processors on
>> >>> >> >> >>>> > and off it works mostly fine, but running this at volume
>> I
>> >>> >> >> >>>> > receive
>> >>> >> >> >>>> > many
>> >>> >> >> >>>> > errors about "no release signal found"
>> >>> >> >> >>>> >
>> >>> >> >> >>>> > I have tried to fix this in a few different ways, but the
>> >>> >> >> >>>> > issue
>> >>> >> >> >>>> > keeps
>> >>> >> >> >>>> > coming
>> >>> >> >> >>>> > back. This is also not consistent at all - different wait
>> >>> >> >> >>>> > processors
>> >>> >> >> >>>> > will
>> >>> >> >> >>>> > block different flowfiles at different times, without
>> >>> >> >> >>>> > changing
>> >>> >> >> >>>> > any
>> >>> >> >> >>>> > configuration. Stop/Start the flow, and different queues
>> will
>> >>> >> >> >>>> > fill
>> >>> >> >> >>>> > up.
>> >>> >> >> >>>> > Do
>> >>> >> >> >>>> > you have any ideas what could be causing this behavior? I
>> >>> >> >> >>>> > checked
>> >>> >> >> >>>> > the
>> >>> >> >> >>>> > DistributedMapCache Server/Client components, and they
>> all
>> >>> >> >> >>>> > appear
>> >>> >> >> >>>> > to
>> >>> >> >> >>>> > be
>> >>> >> >> >>>> > working OK.
>> >>> >> >> >>>> >
>> >>> >> >> >>>> > Thanks,
>> >>> >> >> >>>> >
>> >>> >> >> >>>> > Martijn
>> >>> >> >> >>>> >
>> >>> >> >> >>>> > On 28 May 2018 at 05:11, Koji Kawamura
>> >>> >> >> >>>> > <ij...@gmail.com>
>> >>> >> >> >>>> > wrote:
>> >>> >> >> >>>> >>
>> >>> >> >> >>>> >> Hi Martin,
>> >>> >> >> >>>> >>
>> >>> >> >> >>>> >> Alternative approach is using Wait/Notify processors.
>> >>> >> >> >>>> >> I have developed similar flow using those before, and it
>> >>> >> >> >>>> >> will
>> >>> >> >> >>>> >> work
>> >>> >> >> >>>> >> with your case I believe.
>> >>> >> >> >>>> >> A NiFi flow template is available here.
>> >>> >> >> >>>> >>
>> >>> >> >> >>>> >>
>> >>> >> >> >>>> >>
>> >>> >> >> >>>> >> https://gist.github.com/ijokar
>> umawak/06b3b071eeb4d10d8a27507981422edd
>> >>> >> >> >>>> >>
>> >>> >> >> >>>> >> Hope this helps,
>> >>> >> >> >>>> >> Koji
>> >>> >> >> >>>> >>
>> >>> >> >> >>>> >>
>> >>> >> >> >>>> >> On Sun, May 27, 2018 at 11:48 PM, Andrew Grande
>> >>> >> >> >>>> >> <ap...@gmail.com>
>> >>> >> >> >>>> >> wrote:
>> >>> >> >> >>>> >> > Martijn,
>> >>> >> >> >>>> >> >
>> >>> >> >> >>>> >> > Here's an idea you could explore. Have the ListFile
>> >>> >> >> >>>> >> > processor
>> >>> >> >> >>>> >> > work
>> >>> >> >> >>>> >> > as
>> >>> >> >> >>>> >> > usual
>> >>> >> >> >>>> >> > and create a custom component (start with a scripting
>> one
>> >>> >> >> >>>> >> > to
>> >>> >> >> >>>> >> > prototype)
>> >>> >> >> >>>> >> > grouping the filenames as needed. I don't know of the
>> >>> >> >> >>>> >> > number
>> >>> >> >> >>>> >> > of
>> >>> >> >> >>>> >> > files in
>> >>> >> >> >>>> >> > a
>> >>> >> >> >>>> >> > set is different every time, so trying to be more
>> robust.
>> >>> >> >> >>>> >> >
>> >>> >> >> >>>> >> > Once you group and count the set, you can transfer the
>> >>> >> >> >>>> >> > names
>> >>> >> >> >>>> >> > to
>> >>> >> >> >>>> >> > the
>> >>> >> >> >>>> >> > success
>> >>> >> >> >>>> >> > relationship. Ignore otherwise and wait until the set
>> is
>> >>> >> >> >>>> >> > full.
>> >>> >> >> >>>> >> >
>> >>> >> >> >>>> >> > Andrew
>> >>> >> >> >>>> >> >
>> >>> >> >> >>>> >> >
>> >>> >> >> >>>> >> > On Sun, May 27, 2018, 7:29 AM Martijn Dekkers
>> >>> >> >> >>>> >> > <ma...@dekkers.org.uk>
>> >>> >> >> >>>> >> > wrote:
>> >>> >> >> >>>> >> >>
>> >>> >> >> >>>> >> >> Hello all,
>> >>> >> >> >>>> >> >>
>> >>> >> >> >>>> >> >> I am trying to work out an issue with little success.
>> >>> >> >> >>>> >> >>
>> >>> >> >> >>>> >> >> I need to ingest files generated by some
>> application. I
>> >>> >> >> >>>> >> >> can
>> >>> >> >> >>>> >> >> only
>> >>> >> >> >>>> >> >> ingest
>> >>> >> >> >>>> >> >> these files when a specific set exists. For example:
>> >>> >> >> >>>> >> >>
>> >>> >> >> >>>> >> >> file_123_456_ab.ex1
>> >>> >> >> >>>> >> >> file_123_456_cd.ex1
>> >>> >> >> >>>> >> >> file_123_456_ef.ex1
>> >>> >> >> >>>> >> >> file_123_456_gh.ex1
>> >>> >> >> >>>> >> >> file_123_456.ex2
>> >>> >> >> >>>> >> >>
>> >>> >> >> >>>> >> >> Only when a set like that exists should I pick them
>> up
>> >>> >> >> >>>> >> >> into
>> >>> >> >> >>>> >> >> the
>> >>> >> >> >>>> >> >> Flow.
>> >>> >> >> >>>> >> >> The
>> >>> >> >> >>>> >> >> parts I am looking for to "group" would "ab.ex1",
>> >>> >> >> >>>> >> >> "cd.ex1",
>> >>> >> >> >>>> >> >> "ef.ex1",
>> >>> >> >> >>>> >> >> "gh.ex1", ".ex2".
>> >>> >> >> >>>> >> >>
>> >>> >> >> >>>> >> >> I tried to do this with some expression, but couldn't
>> >>> >> >> >>>> >> >> work
>> >>> >> >> >>>> >> >> it
>> >>> >> >> >>>> >> >> out.
>> >>> >> >> >>>> >> >>
>> >>> >> >> >>>> >> >> What would be the best way to achieve this?
>> >>> >> >> >>>> >> >>
>> >>> >> >> >>>> >> >> Many thanks!
>> >>> >> >> >>>> >
>> >>> >> >> >>>> >
>> >>> >> >> >>>
>> >>> >> >> >>>
>> >>> >> >> >>
>> >>> >> >> >
>> >>> >> >
>> >>> >> >
>> >>> >
>> >>> >
>> >>
>> >>
>>
>
>