You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Nicholas Hughes <ni...@gmail.com> on 2016/12/15 21:13:54 UTC

DistributedMapCache w/ ListSFTP and FetchSFTP

I'm testing a simple List/Fetch setup on a 3 node cluster. I created a
DistributedMapCacheServer controller service with the default settings (no
SSL) and then created a DistributedMapCacheClientService that points at one
of the cluster hostnames. The ListSFTP processor is set to use the
Distributed Cache Service that I created.

The ListSFTP processor lists the same 100 source files from the remote
system on each node, and sends 300 Flow Files downstream to the FetchSFTP
processor. I thought that the map cache allowed the cluster nodes to
determine which files had already been listed by other cluster nodes...
maybe I'm missing something.

Any assistance is appreciated.

NiFi version 1.0.0 in HDF 2.0.1


-Nick

Re: DistributedMapCache w/ ListSFTP and FetchSFTP

Posted by Bryan Bende <bb...@gmail.com>.
Yes from a quick look at the code, ListSFTP should be able to work fine
with out the distributed cache.

If you are interested, the relevant code is in the updateState method of
this class:

https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java

On Thu, Dec 15, 2016 at 9:09 PM Nicholas Hughes <
nicholasmhughes.nifi@gmail.com> wrote:

> Thanks for the explanation Bryan. Do you know offhand if ListSFTP
> specifically has the logic to check for the distributed cache first, and
> then will fail over to use the more recent state management? If so, I
> should be able to remove the reference to the distributed map cache client
> service and still retain the desired functionality, correct?
>
> -Nick
>
>
> On Thu, Dec 15, 2016 at 9:02 PM, Bryan Bende <bb...@gmail.com> wrote:
>
> I believe that Pierre's last point about this processor being developed
> before NiFi's built in state management feature is correct. Many processors
> would originally store state in a local file as well as in the distributed
> cache, they were still meant to run only on the primary node,  but this way
> if the primary node went down and moved to a new node, it could use the
> distributed cache to pick up in the same place.
>
> Later on the state management feature (backed by ZooKeeper) was introduced
> and many of these processors were converted to use this instead. However,
> most of them still first check the distributed cache to see if there is any
> state that needs to be migrated to ZooKeeper to handle the case where
> someone is starting for the first time after upgrading from pre-state
> management.
>
> We are probably well past the point where we can remove this logic and if
> anyone is upgrading from a version pre-state management (0.5.0??) then they
> would upgrade to that first then upgrade again to the latest 1.x release.
>
> On Thu, Dec 15, 2016 at 5:01 PM Pierre Villard <
> pierre.villard.fr@gmail.com> wrote:
>
> Not sure I'm following you on "So, the DMC is just so you won't duplicate
> fetches if you're listing faster than you're fetching... got it". :)
>
> Let's say the DMC is just here to store the state of the List processor
> across the cluster in case the node goes down and a new primary node is
> elected. But this is not really related to the Fetch processor (I may have
> been misleading in my previous answer). Thanks to the state (timestamp
> based IIRC), the List processor won't list the same file twice and it
> ensures that you won't get duplicates.
>
> The fact that we are using the DMC instead of the states provided by the
> NiFi framework is maybe related to the fact that this processor has been
> developed more than one year ago (and state management appeared about 11
> months ago). ListFile for example also stores a state but does not need a
> DMC. Maybe someone else can confirm or correct me if I'm wrong.
>
> In fact I think that this processor could be improved to get rid of the
> need of a DMC and relies on the NiFi framework to store the state of the
> processor.
>
> Pierre
>
>
>
> 2016-12-15 22:39 GMT+01:00 Nicholas Hughes <nicholasmhughes.nifi@gmail.com
> >:
>
> Pierre,
>
> Thank you for the quick response. So, the DMC is just so you won't
> duplicate fetches if you're listing faster than you're fetching... got it.
> The usage documentation is kinda vague about that, so I made it out to be
> more magical than it is. Thanks for pointing me in the right direction!
>
> -Nick
>
>
> On Thu, Dec 15, 2016 at 4:21 PM, Pierre Villard <
> pierre.villard.fr@gmail.com> wrote:
>
> Hi Nicholas,
>
> You need to configure your ListSFTP processor to only run on the primary
> node (scheduling strategy in processor configuration), then to send the
> flow files to a RPG that points to an input port in the cluster itself (so
> that flow files are distributed over the cluster and do not stay only on
> the primary node), then the FetchSFTP processor will take care of
> downloading the files. The ListSFTP, with its state (DistributedCache),
> ensures that you don't download the same file twice, and a given file won't
> be downloaded by two nodes at the same time.
>
> Hope this helps,
> Pierre.
>
> 2016-12-15 22:13 GMT+01:00 Nicholas Hughes <nicholasmhughes.nifi@gmail.com
> >:
>
> I'm testing a simple List/Fetch setup on a 3 node cluster. I created a
> DistributedMapCacheServer controller service with the default settings (no
> SSL) and then created a DistributedMapCacheClientService that points at one
> of the cluster hostnames. The ListSFTP processor is set to use the
> Distributed Cache Service that I created.
>
> The ListSFTP processor lists the same 100 source files from the remote
> system on each node, and sends 300 Flow Files downstream to the FetchSFTP
> processor. I thought that the map cache allowed the cluster nodes to
> determine which files had already been listed by other cluster nodes...
> maybe I'm missing something.
>
> Any assistance is appreciated.
>
> NiFi version 1.0.0 in HDF 2.0.1
>
>
> -Nick
>
>
>
>
>
>
>
>
>
>
>
>
> --
> Sent from Gmail Mobile
>
>
>
>
>
> --
Sent from Gmail Mobile

Re: DistributedMapCache w/ ListSFTP and FetchSFTP

Posted by Nicholas Hughes <ni...@gmail.com>.
Thanks for the explanation Bryan. Do you know offhand if ListSFTP
specifically has the logic to check for the distributed cache first, and
then will fail over to use the more recent state management? If so, I
should be able to remove the reference to the distributed map cache client
service and still retain the desired functionality, correct?

-Nick


On Thu, Dec 15, 2016 at 9:02 PM, Bryan Bende <bb...@gmail.com> wrote:

> I believe that Pierre's last point about this processor being developed
> before NiFi's built in state management feature is correct. Many processors
> would originally store state in a local file as well as in the distributed
> cache, they were still meant to run only on the primary node,  but this way
> if the primary node went down and moved to a new node, it could use the
> distributed cache to pick up in the same place.
>
> Later on the state management feature (backed by ZooKeeper) was introduced
> and many of these processors were converted to use this instead. However,
> most of them still first check the distributed cache to see if there is any
> state that needs to be migrated to ZooKeeper to handle the case where
> someone is starting for the first time after upgrading from pre-state
> management.
>
> We are probably well past the point where we can remove this logic and if
> anyone is upgrading from a version pre-state management (0.5.0??) then they
> would upgrade to that first then upgrade again to the latest 1.x release.
>
> On Thu, Dec 15, 2016 at 5:01 PM Pierre Villard <
> pierre.villard.fr@gmail.com> wrote:
>
>> Not sure I'm following you on "So, the DMC is just so you won't duplicate
>> fetches if you're listing faster than you're fetching... got it". :)
>>
>> Let's say the DMC is just here to store the state of the List processor
>> across the cluster in case the node goes down and a new primary node is
>> elected. But this is not really related to the Fetch processor (I may have
>> been misleading in my previous answer). Thanks to the state (timestamp
>> based IIRC), the List processor won't list the same file twice and it
>> ensures that you won't get duplicates.
>>
>> The fact that we are using the DMC instead of the states provided by the
>> NiFi framework is maybe related to the fact that this processor has been
>> developed more than one year ago (and state management appeared about 11
>> months ago). ListFile for example also stores a state but does not need a
>> DMC. Maybe someone else can confirm or correct me if I'm wrong.
>>
>> In fact I think that this processor could be improved to get rid of the
>> need of a DMC and relies on the NiFi framework to store the state of the
>> processor.
>>
>> Pierre
>>
>>
>>
>> 2016-12-15 22:39 GMT+01:00 Nicholas Hughes <nicholasmhughes.nifi@gmail.
>> com>:
>>
>> Pierre,
>>
>> Thank you for the quick response. So, the DMC is just so you won't
>> duplicate fetches if you're listing faster than you're fetching... got it.
>> The usage documentation is kinda vague about that, so I made it out to be
>> more magical than it is. Thanks for pointing me in the right direction!
>>
>> -Nick
>>
>>
>> On Thu, Dec 15, 2016 at 4:21 PM, Pierre Villard <
>> pierre.villard.fr@gmail.com> wrote:
>>
>> Hi Nicholas,
>>
>> You need to configure your ListSFTP processor to only run on the primary
>> node (scheduling strategy in processor configuration), then to send the
>> flow files to a RPG that points to an input port in the cluster itself (so
>> that flow files are distributed over the cluster and do not stay only on
>> the primary node), then the FetchSFTP processor will take care of
>> downloading the files. The ListSFTP, with its state (DistributedCache),
>> ensures that you don't download the same file twice, and a given file won't
>> be downloaded by two nodes at the same time.
>>
>> Hope this helps,
>> Pierre.
>>
>> 2016-12-15 22:13 GMT+01:00 Nicholas Hughes <nicholasmhughes.nifi@gmail.
>> com>:
>>
>> I'm testing a simple List/Fetch setup on a 3 node cluster. I created a
>> DistributedMapCacheServer controller service with the default settings (no
>> SSL) and then created a DistributedMapCacheClientService that points at
>> one of the cluster hostnames. The ListSFTP processor is set to use the
>> Distributed Cache Service that I created.
>>
>> The ListSFTP processor lists the same 100 source files from the remote
>> system on each node, and sends 300 Flow Files downstream to the FetchSFTP
>> processor. I thought that the map cache allowed the cluster nodes to
>> determine which files had already been listed by other cluster nodes...
>> maybe I'm missing something.
>>
>> Any assistance is appreciated.
>>
>> NiFi version 1.0.0 in HDF 2.0.1
>>
>>
>> -Nick
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
> Sent from Gmail Mobile
>

Re: DistributedMapCache w/ ListSFTP and FetchSFTP

Posted by Bryan Bende <bb...@gmail.com>.
I believe that Pierre's last point about this processor being developed
before NiFi's built in state management feature is correct. Many processors
would originally store state in a local file as well as in the distributed
cache, they were still meant to run only on the primary node,  but this way
if the primary node went down and moved to a new node, it could use the
distributed cache to pick up in the same place.

Later on the state management feature (backed by ZooKeeper) was introduced
and many of these processors were converted to use this instead. However,
most of them still first check the distributed cache to see if there is any
state that needs to be migrated to ZooKeeper to handle the case where
someone is starting for the first time after upgrading from pre-state
management.

We are probably well past the point where we can remove this logic and if
anyone is upgrading from a version pre-state management (0.5.0??) then they
would upgrade to that first then upgrade again to the latest 1.x release.

On Thu, Dec 15, 2016 at 5:01 PM Pierre Villard <pi...@gmail.com>
wrote:

> Not sure I'm following you on "So, the DMC is just so you won't duplicate
> fetches if you're listing faster than you're fetching... got it". :)
>
> Let's say the DMC is just here to store the state of the List processor
> across the cluster in case the node goes down and a new primary node is
> elected. But this is not really related to the Fetch processor (I may have
> been misleading in my previous answer). Thanks to the state (timestamp
> based IIRC), the List processor won't list the same file twice and it
> ensures that you won't get duplicates.
>
> The fact that we are using the DMC instead of the states provided by the
> NiFi framework is maybe related to the fact that this processor has been
> developed more than one year ago (and state management appeared about 11
> months ago). ListFile for example also stores a state but does not need a
> DMC. Maybe someone else can confirm or correct me if I'm wrong.
>
> In fact I think that this processor could be improved to get rid of the
> need of a DMC and relies on the NiFi framework to store the state of the
> processor.
>
> Pierre
>
>
>
> 2016-12-15 22:39 GMT+01:00 Nicholas Hughes <nicholasmhughes.nifi@gmail.com
> >:
>
> Pierre,
>
> Thank you for the quick response. So, the DMC is just so you won't
> duplicate fetches if you're listing faster than you're fetching... got it.
> The usage documentation is kinda vague about that, so I made it out to be
> more magical than it is. Thanks for pointing me in the right direction!
>
> -Nick
>
>
> On Thu, Dec 15, 2016 at 4:21 PM, Pierre Villard <
> pierre.villard.fr@gmail.com> wrote:
>
> Hi Nicholas,
>
> You need to configure your ListSFTP processor to only run on the primary
> node (scheduling strategy in processor configuration), then to send the
> flow files to a RPG that points to an input port in the cluster itself (so
> that flow files are distributed over the cluster and do not stay only on
> the primary node), then the FetchSFTP processor will take care of
> downloading the files. The ListSFTP, with its state (DistributedCache),
> ensures that you don't download the same file twice, and a given file won't
> be downloaded by two nodes at the same time.
>
> Hope this helps,
> Pierre.
>
> 2016-12-15 22:13 GMT+01:00 Nicholas Hughes <nicholasmhughes.nifi@gmail.com
> >:
>
> I'm testing a simple List/Fetch setup on a 3 node cluster. I created a
> DistributedMapCacheServer controller service with the default settings (no
> SSL) and then created a DistributedMapCacheClientService that points at one
> of the cluster hostnames. The ListSFTP processor is set to use the
> Distributed Cache Service that I created.
>
> The ListSFTP processor lists the same 100 source files from the remote
> system on each node, and sends 300 Flow Files downstream to the FetchSFTP
> processor. I thought that the map cache allowed the cluster nodes to
> determine which files had already been listed by other cluster nodes...
> maybe I'm missing something.
>
> Any assistance is appreciated.
>
> NiFi version 1.0.0 in HDF 2.0.1
>
>
> -Nick
>
>
>
>
>
>
>
>
>
>
>
>
> --
Sent from Gmail Mobile

Re: DistributedMapCache w/ ListSFTP and FetchSFTP

Posted by Pierre Villard <pi...@gmail.com>.
Not sure I'm following you on "So, the DMC is just so you won't duplicate
fetches if you're listing faster than you're fetching... got it". :)

Let's say the DMC is just here to store the state of the List processor
across the cluster in case the node goes down and a new primary node is
elected. But this is not really related to the Fetch processor (I may have
been misleading in my previous answer). Thanks to the state (timestamp
based IIRC), the List processor won't list the same file twice and it
ensures that you won't get duplicates.

The fact that we are using the DMC instead of the states provided by the
NiFi framework is maybe related to the fact that this processor has been
developed more than one year ago (and state management appeared about 11
months ago). ListFile for example also stores a state but does not need a
DMC. Maybe someone else can confirm or correct me if I'm wrong.

In fact I think that this processor could be improved to get rid of the
need of a DMC and relies on the NiFi framework to store the state of the
processor.

Pierre



2016-12-15 22:39 GMT+01:00 Nicholas Hughes <ni...@gmail.com>:

> Pierre,
>
> Thank you for the quick response. So, the DMC is just so you won't
> duplicate fetches if you're listing faster than you're fetching... got it.
> The usage documentation is kinda vague about that, so I made it out to be
> more magical than it is. Thanks for pointing me in the right direction!
>
> -Nick
>
>
> On Thu, Dec 15, 2016 at 4:21 PM, Pierre Villard <
> pierre.villard.fr@gmail.com> wrote:
>
>> Hi Nicholas,
>>
>> You need to configure your ListSFTP processor to only run on the primary
>> node (scheduling strategy in processor configuration), then to send the
>> flow files to a RPG that points to an input port in the cluster itself (so
>> that flow files are distributed over the cluster and do not stay only on
>> the primary node), then the FetchSFTP processor will take care of
>> downloading the files. The ListSFTP, with its state (DistributedCache),
>> ensures that you don't download the same file twice, and a given file won't
>> be downloaded by two nodes at the same time.
>>
>> Hope this helps,
>> Pierre.
>>
>> 2016-12-15 22:13 GMT+01:00 Nicholas Hughes <nicholasmhughes.nifi@gmail.co
>> m>:
>>
>>> I'm testing a simple List/Fetch setup on a 3 node cluster. I created a
>>> DistributedMapCacheServer controller service with the default settings (no
>>> SSL) and then created a DistributedMapCacheClientService that points at
>>> one of the cluster hostnames. The ListSFTP processor is set to use the
>>> Distributed Cache Service that I created.
>>>
>>> The ListSFTP processor lists the same 100 source files from the remote
>>> system on each node, and sends 300 Flow Files downstream to the FetchSFTP
>>> processor. I thought that the map cache allowed the cluster nodes to
>>> determine which files had already been listed by other cluster nodes...
>>> maybe I'm missing something.
>>>
>>> Any assistance is appreciated.
>>>
>>> NiFi version 1.0.0 in HDF 2.0.1
>>>
>>>
>>> -Nick
>>>
>>>
>>
>

Re: DistributedMapCache w/ ListSFTP and FetchSFTP

Posted by Nicholas Hughes <ni...@gmail.com>.
Pierre,

Thank you for the quick response. So, the DMC is just so you won't
duplicate fetches if you're listing faster than you're fetching... got it.
The usage documentation is kinda vague about that, so I made it out to be
more magical than it is. Thanks for pointing me in the right direction!

-Nick


On Thu, Dec 15, 2016 at 4:21 PM, Pierre Villard <pierre.villard.fr@gmail.com
> wrote:

> Hi Nicholas,
>
> You need to configure your ListSFTP processor to only run on the primary
> node (scheduling strategy in processor configuration), then to send the
> flow files to a RPG that points to an input port in the cluster itself (so
> that flow files are distributed over the cluster and do not stay only on
> the primary node), then the FetchSFTP processor will take care of
> downloading the files. The ListSFTP, with its state (DistributedCache),
> ensures that you don't download the same file twice, and a given file won't
> be downloaded by two nodes at the same time.
>
> Hope this helps,
> Pierre.
>
> 2016-12-15 22:13 GMT+01:00 Nicholas Hughes <nicholasmhughes.nifi@gmail.com
> >:
>
>> I'm testing a simple List/Fetch setup on a 3 node cluster. I created a
>> DistributedMapCacheServer controller service with the default settings (no
>> SSL) and then created a DistributedMapCacheClientService that points at
>> one of the cluster hostnames. The ListSFTP processor is set to use the
>> Distributed Cache Service that I created.
>>
>> The ListSFTP processor lists the same 100 source files from the remote
>> system on each node, and sends 300 Flow Files downstream to the FetchSFTP
>> processor. I thought that the map cache allowed the cluster nodes to
>> determine which files had already been listed by other cluster nodes...
>> maybe I'm missing something.
>>
>> Any assistance is appreciated.
>>
>> NiFi version 1.0.0 in HDF 2.0.1
>>
>>
>> -Nick
>>
>>
>

Re: DistributedMapCache w/ ListSFTP and FetchSFTP

Posted by Pierre Villard <pi...@gmail.com>.
Hi Nicholas,

You need to configure your ListSFTP processor to only run on the primary
node (scheduling strategy in processor configuration), then to send the
flow files to a RPG that points to an input port in the cluster itself (so
that flow files are distributed over the cluster and do not stay only on
the primary node), then the FetchSFTP processor will take care of
downloading the files. The ListSFTP, with its state (DistributedCache),
ensures that you don't download the same file twice, and a given file won't
be downloaded by two nodes at the same time.

Hope this helps,
Pierre.

2016-12-15 22:13 GMT+01:00 Nicholas Hughes <ni...@gmail.com>:

> I'm testing a simple List/Fetch setup on a 3 node cluster. I created a
> DistributedMapCacheServer controller service with the default settings (no
> SSL) and then created a DistributedMapCacheClientService that points at
> one of the cluster hostnames. The ListSFTP processor is set to use the
> Distributed Cache Service that I created.
>
> The ListSFTP processor lists the same 100 source files from the remote
> system on each node, and sends 300 Flow Files downstream to the FetchSFTP
> processor. I thought that the map cache allowed the cluster nodes to
> determine which files had already been listed by other cluster nodes...
> maybe I'm missing something.
>
> Any assistance is appreciated.
>
> NiFi version 1.0.0 in HDF 2.0.1
>
>
> -Nick
>
>