You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Matt Burgess <ma...@apache.org> on 2017/02/13 18:24:42 UTC

Re: RemoveDistributedMapCache

Carlos,

With a RemoveDistributedMapCache processor in your suggested flow,
there might be an issue depending on when the duplicates are routed
off.  For example, if the first time we see the table name, that flow
file gets all the way through to RemoveDistributedMapCache before a
duplicate has been detected by DetectDuplicate, then the cache entry
would be removed and you could process the same table twice. I guess
the question here is: how do you know when you're "done" with the
cache value?

Also FWIW, speaking of my Groovy DCache script, you can use it (or
parts of it) in an ExecuteScript processor to emulate the
functionality of a RemoveDistributedMapCache processor.

Regards,
Matt


On Mon, Feb 13, 2017 at 12:54 PM, Carlos Manuel Fernandes (DSI)
<ca...@cgd.pt> wrote:
> Hello,
>
>
>
> I ‘m using NIFI to replicate tables from one relational Database(Mainframe)
> to other Database,  with incremental updates,  based on a timestamp and
> primary key. The process is made with tree custom processors:
> GenerateListOfTablesToSyncronize  -> CreteTableIfNotExists ->
> IncrementalLoadData.  If by mistake, in GenerateListOfTablesToSyncronize i
> generate the same table twice, I must guarantee the two flows run sequential
> not in parallel. For that, I need some kind of lock, and the  MapCache
> processors seems to be the solution.    The solution I see is:
>
>
>
> GenerateListOfTablesToSyncronize  -> DetectDuplicte (tableName, with no age
> Off) ->CreteTableIfNotExists -> IncrementalLoadData –>
> RemoveDistributedMapCache (tableName)
>
>
>
> Unfortunately there isn’t  the processor RemoveDistributedMapCache, I could
> handle this,  thanks to Matt Burgess
> (https://community.hortonworks.com/articles/71837/working-with-a-nifi-distributedmapcache.html)
> which make possible manipulate directly the Cache using groovy.   No one
> have this kind of requirement to justify the creation of
> RemoveDistributedMapCache ?
>
>
>
> Thanks
>
>
>
> Carlos

RE: RemoveDistributedMapCache

Posted by "Carlos Manuel Fernandes (DSI)" <ca...@cgd.pt>.
Matt,

Yes, i have  the  "duplicate" relationship being routed back to the DetectDuplicate. Yes  I set Max Concurrent Tasks in IncrementalLoadData to something greater than 1, permitting processing  tables in "parallel".  I Want process  many flows with a specific tableName since I don’t process them in "parallel", because  IncrementalLoadData has tree steps must be atomic : "Read the max(Timestamp) in target table"  +  "Read data from source > max(timestamp)" + "apply data to destination". 

With this requisite,  a simple put tablename in cache,  validate if tablename  is in cache (and if yes wait),  and remove from cache after the job  done , function like a semaphore and function well. I could accomplish that using your Dcache Code  to  build a  custom  RemoveDistributedMapCache.

I think for other uses cases  when a simple semaphore is  sufficient  the  RemoveDistributedMapCache  can be useful.

Thanks 

Carlos



-----Original Message-----
From: Matt Burgess [mailto:mattyb149@apache.org] 
Sent: segunda-feira, 13 de Fevereiro de 2017 19:46
To: users@nifi.apache.org
Subject: Re: RemoveDistributedMapCache

Carlos,

Do you have the "duplicate" relationship being routed back to the DetectDuplicate?  Also if you are trying to do different tables in "parallel" (which is just "concurrent" on a single NiFi instance), I assume you have set Max Concurrent Tasks in IncrementalLoadData to something greater than 1?  How are you populating your DistributedMapCache?  With a "run-once" separate flow?

An issue I see here is that once the table name is removed from the cache, then unless something "resets the lock", all future flow files with that table name will still be processed. If for some reason you are guaranteed to only have 2 duplicate flow files per table name, this might work (although you would have to manually refresh the cache before running again).  An alternative might be to replace DetectDuplicate with PutDistributedMapCache, setting the Cache Update Strategy to "keep original". Then the flow files will have a "cached"
attribute, where the first flow file (i.e. when the cache value is not
present) will have "cached" set to true and the remainder (until the cache entry is removed) will have "cached" false, so you can use RouteOnAttribute to send the non-cached flow files back to PutDistributedMapCache.

In this solution you'd still be relying on a "RemoveDistributedMapCache" processor which does not exist, but you could use the Groovy script as described in my last response.

This "lock" idea is interesting, I will give it some more thought, perhaps there is something we could do in the framework and/or extensions to enable this, if it is a common use case.  There have been different Jira cases and discussions about barriers and general aggregation patterns, though I'm not sure how/if they'd apply here.

Regards,
Matt

On Mon, Feb 13, 2017 at 2:08 PM, Carlos Manuel Fernandes (DSI) <ca...@cgd.pt> wrote:
> Thanks Matt for your quickly response.
>
> My problem isn’t to process the same table twice,  but to guarantee I don’t process the same table at the same time,   what I wish achieve is a synchronized process for  each table.
>
> Regards
> Carlos
>
> -----Original Message-----
> From: Matt Burgess [mailto:mattyb149@apache.org]
> Sent: segunda-feira, 13 de Fevereiro de 2017 18:25
> To: users@nifi.apache.org
> Subject: Re: RemoveDistributedMapCache
>
> Carlos,
>
> With a RemoveDistributedMapCache processor in your suggested flow, there might be an issue depending on when the duplicates are routed off.  For example, if the first time we see the table name, that flow file gets all the way through to RemoveDistributedMapCache before a duplicate has been detected by DetectDuplicate, then the cache entry would be removed and you could process the same table twice. I guess the question here is: how do you know when you're "done" with the cache value?
>
> Also FWIW, speaking of my Groovy DCache script, you can use it (or parts of it) in an ExecuteScript processor to emulate the functionality of a RemoveDistributedMapCache processor.
>
> Regards,
> Matt
>
>
> On Mon, Feb 13, 2017 at 12:54 PM, Carlos Manuel Fernandes (DSI) <ca...@cgd.pt> wrote:
>> Hello,
>>
>>
>>
>> I ‘m using NIFI to replicate tables from one relational
>> Database(Mainframe) to other Database,  with incremental updates, 
>> based on a timestamp and primary key. The process is made with tree custom processors:
>> GenerateListOfTablesToSyncronize  -> CreteTableIfNotExists -> 
>> IncrementalLoadData.  If by mistake, in 
>> GenerateListOfTablesToSyncronize i generate the same table twice, I 
>> must guarantee the two flows run sequential not in parallel. For that, I need some kind of lock, and the  MapCache
>> processors seems to be the solution.    The solution I see is:
>>
>>
>>
>> GenerateListOfTablesToSyncronize  -> DetectDuplicte (tableName, with 
>> no age
>> Off) ->CreteTableIfNotExists -> IncrementalLoadData –> 
>> RemoveDistributedMapCache (tableName)
>>
>>
>>
>> Unfortunately there isn’t  the processor RemoveDistributedMapCache, I 
>> could handle this,  thanks to Matt Burgess
>> (https://community.hortonworks.com/articles/71837/working-with-a-nifi-distributedmapcache.html)
>> which make possible manipulate directly the Cache using groovy.   No one
>> have this kind of requirement to justify the creation of 
>> RemoveDistributedMapCache ?
>>
>>
>>
>> Thanks
>>
>>
>>
>> Carlos

Re: RemoveDistributedMapCache

Posted by Matt Burgess <ma...@apache.org>.
Carlos,

Do you have the "duplicate" relationship being routed back to the
DetectDuplicate?  Also if you are trying to do different tables in
"parallel" (which is just "concurrent" on a single NiFi instance), I
assume you have set Max Concurrent Tasks in IncrementalLoadData to
something greater than 1?  How are you populating your
DistributedMapCache?  With a "run-once" separate flow?

An issue I see here is that once the table name is removed from the
cache, then unless something "resets the lock", all future flow files
with that table name will still be processed. If for some reason you
are guaranteed to only have 2 duplicate flow files per table name,
this might work (although you would have to manually refresh the cache
before running again).  An alternative might be to replace
DetectDuplicate with PutDistributedMapCache, setting the Cache Update
Strategy to "keep original". Then the flow files will have a "cached"
attribute, where the first flow file (i.e. when the cache value is not
present) will have "cached" set to true and the remainder (until the
cache entry is removed) will have "cached" false, so you can use
RouteOnAttribute to send the non-cached flow files back to
PutDistributedMapCache.

In this solution you'd still be relying on a
"RemoveDistributedMapCache" processor which does not exist, but you
could use the Groovy script as described in my last response.

This "lock" idea is interesting, I will give it some more thought,
perhaps there is something we could do in the framework and/or
extensions to enable this, if it is a common use case.  There have
been different Jira cases and discussions about barriers and general
aggregation patterns, though I'm not sure how/if they'd apply here.

Regards,
Matt

On Mon, Feb 13, 2017 at 2:08 PM, Carlos Manuel Fernandes (DSI)
<ca...@cgd.pt> wrote:
> Thanks Matt for your quickly response.
>
> My problem isn’t to process the same table twice,  but to guarantee I don’t process the same table at the same time,   what I wish achieve is a synchronized process for  each table.
>
> Regards
> Carlos
>
> -----Original Message-----
> From: Matt Burgess [mailto:mattyb149@apache.org]
> Sent: segunda-feira, 13 de Fevereiro de 2017 18:25
> To: users@nifi.apache.org
> Subject: Re: RemoveDistributedMapCache
>
> Carlos,
>
> With a RemoveDistributedMapCache processor in your suggested flow, there might be an issue depending on when the duplicates are routed off.  For example, if the first time we see the table name, that flow file gets all the way through to RemoveDistributedMapCache before a duplicate has been detected by DetectDuplicate, then the cache entry would be removed and you could process the same table twice. I guess the question here is: how do you know when you're "done" with the cache value?
>
> Also FWIW, speaking of my Groovy DCache script, you can use it (or parts of it) in an ExecuteScript processor to emulate the functionality of a RemoveDistributedMapCache processor.
>
> Regards,
> Matt
>
>
> On Mon, Feb 13, 2017 at 12:54 PM, Carlos Manuel Fernandes (DSI) <ca...@cgd.pt> wrote:
>> Hello,
>>
>>
>>
>> I ‘m using NIFI to replicate tables from one relational
>> Database(Mainframe) to other Database,  with incremental updates,
>> based on a timestamp and primary key. The process is made with tree custom processors:
>> GenerateListOfTablesToSyncronize  -> CreteTableIfNotExists ->
>> IncrementalLoadData.  If by mistake, in
>> GenerateListOfTablesToSyncronize i generate the same table twice, I
>> must guarantee the two flows run sequential not in parallel. For that, I need some kind of lock, and the  MapCache
>> processors seems to be the solution.    The solution I see is:
>>
>>
>>
>> GenerateListOfTablesToSyncronize  -> DetectDuplicte (tableName, with
>> no age
>> Off) ->CreteTableIfNotExists -> IncrementalLoadData –>
>> RemoveDistributedMapCache (tableName)
>>
>>
>>
>> Unfortunately there isn’t  the processor RemoveDistributedMapCache, I
>> could handle this,  thanks to Matt Burgess
>> (https://community.hortonworks.com/articles/71837/working-with-a-nifi-distributedmapcache.html)
>> which make possible manipulate directly the Cache using groovy.   No one
>> have this kind of requirement to justify the creation of
>> RemoveDistributedMapCache ?
>>
>>
>>
>> Thanks
>>
>>
>>
>> Carlos

RE: RemoveDistributedMapCache

Posted by "Carlos Manuel Fernandes (DSI)" <ca...@cgd.pt>.
Thanks Matt for your quickly response. 

My problem isn’t to process the same table twice,  but to guarantee I don’t process the same table at the same time,   what I wish achieve is a synchronized process for  each table.

Regards 
Carlos

-----Original Message-----
From: Matt Burgess [mailto:mattyb149@apache.org] 
Sent: segunda-feira, 13 de Fevereiro de 2017 18:25
To: users@nifi.apache.org
Subject: Re: RemoveDistributedMapCache

Carlos,

With a RemoveDistributedMapCache processor in your suggested flow, there might be an issue depending on when the duplicates are routed off.  For example, if the first time we see the table name, that flow file gets all the way through to RemoveDistributedMapCache before a duplicate has been detected by DetectDuplicate, then the cache entry would be removed and you could process the same table twice. I guess the question here is: how do you know when you're "done" with the cache value?

Also FWIW, speaking of my Groovy DCache script, you can use it (or parts of it) in an ExecuteScript processor to emulate the functionality of a RemoveDistributedMapCache processor.

Regards,
Matt


On Mon, Feb 13, 2017 at 12:54 PM, Carlos Manuel Fernandes (DSI) <ca...@cgd.pt> wrote:
> Hello,
>
>
>
> I ‘m using NIFI to replicate tables from one relational 
> Database(Mainframe) to other Database,  with incremental updates,  
> based on a timestamp and primary key. The process is made with tree custom processors:
> GenerateListOfTablesToSyncronize  -> CreteTableIfNotExists -> 
> IncrementalLoadData.  If by mistake, in 
> GenerateListOfTablesToSyncronize i generate the same table twice, I 
> must guarantee the two flows run sequential not in parallel. For that, I need some kind of lock, and the  MapCache
> processors seems to be the solution.    The solution I see is:
>
>
>
> GenerateListOfTablesToSyncronize  -> DetectDuplicte (tableName, with 
> no age
> Off) ->CreteTableIfNotExists -> IncrementalLoadData –> 
> RemoveDistributedMapCache (tableName)
>
>
>
> Unfortunately there isn’t  the processor RemoveDistributedMapCache, I 
> could handle this,  thanks to Matt Burgess
> (https://community.hortonworks.com/articles/71837/working-with-a-nifi-distributedmapcache.html)
> which make possible manipulate directly the Cache using groovy.   No one
> have this kind of requirement to justify the creation of 
> RemoveDistributedMapCache ?
>
>
>
> Thanks
>
>
>
> Carlos