You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Chandni Singh <ch...@datatorrent.com> on 2015/11/17 23:50:40 UTC

Refactoring of IdempotentStorageManager

Hi,

As part of creating a ManagedState, it was brought up that
IdempotentStorageManager needs to be re-factored:
1. Re-name (have started a discussion about it in a separate thread).

2. It should be a layer over Write-ahead-log (WAL) abstraction which was
created in HDHT.

This discussion is about 2nd point.

Currently IdempotentStorageManager is an abstraction above StorageAgent
(which is in Apex core).
IMO this doesn't need to change. We have integrated
IdempotentStorageManager with various input/output operators in Malhar lib
and this abstraction works well.

However the change that I think we need to make (which could be later) is
that IdempotentStorageManager.FSIdempotentStorageManager can use (contain)
WAL to write state to files.

Advantages of this approach:
1. Operators will not be needed to change because api of
IdempotentStorageManager doesn't change. There are quite a few of them.
2. Parallel work is going on HDHT WAL which currently makes it very
difficult to move stuff retaining attribution.
3. Once WAL abstraction is ready, FSIdempotentStorageManager can use it
again without affecting the operators.
4. This is in-line with iterative development :)

Chandni

On Fri, Nov 13, 2015 at 7:32 PM, Chandni Singh <ch...@datatorrent.com>
wrote:

> Let me know if anyone want to collaborate with me on this.
>
> Thanks,
> Chandni
>
> On Tue, Nov 10, 2015 at 6:18 PM, Chandni Singh <ch...@datatorrent.com>
> wrote:
>
>> Have added some more details about a Bucket in the document. Have a look.
>>
>> On Sun, Nov 8, 2015 at 10:37 PM, Chandni Singh <ch...@datatorrent.com>
>> wrote:
>>
>>> Forgot to attach the link.
>>>
>>> https://docs.google.com/document/d/1gRWN9ufKSZSZD0N-pthlhpC9TZ8KwJ6hJlAX6nxl5f8/edit#heading=h.wlc0p58uzygb
>>>
>>>
>>> On Sun, Nov 8, 2015 at 10:36 PM, Chandni Singh <ch...@datatorrent.com>
>>> wrote:
>>>
>>>> Hi,
>>>> This contains the overview of large state management.
>>>> Some parts need more description which I am working on but please free
>>>> to go through it and any feedback is appreciated.
>>>>
>>>> Thanks,
>>>> Chandni
>>>>
>>>>
>>>> On Tue, Oct 20, 2015 at 8:31 AM, Pramod Immaneni <
>>>> pramod@datatorrent.com> wrote:
>>>>
>>>>> This is a much needed component Chandni.
>>>>>
>>>>> The API for the cache will be important as users will be able to plugin
>>>>> different implementations in future like those based off of popular
>>>>> distributed in-memory caches. Ehcache is a popular cache mechanism and
>>>>> API
>>>>> that comes to bind. It comes bundled with a non-distributed
>>>>> implementation
>>>>> but there are commercial distributed implementations of it as well like
>>>>> BigMemory.
>>>>>
>>>>> Given our needs for fault tolerance we may not be able to adopt the
>>>>> ehcache
>>>>> API as is but an extension of it might work. We would still provide a
>>>>> default implementation but going off of a well recognized API will
>>>>> facilitate development of other implementations in future based off of
>>>>> popular implementations already available. We will need to investigate
>>>>> if
>>>>> we can use the API as is or with relatively straightforward extensions
>>>>> which will be a positive for using it. But if the API turns out to be
>>>>> significantly deviating from what we need then that would be a
>>>>> negative.
>>>>>
>>>>> Also it would be great if we could support an iterator to scan all the
>>>>> keys, lazy loading as needed, since this need comes up from time to
>>>>> time in
>>>>> different scenarios such as change data capture calculations.
>>>>>
>>>>> Thanks.
>>>>>
>>>>> On Mon, Oct 19, 2015 at 9:10 PM, Chandni Singh <
>>>>> chandni@datatorrent.com>
>>>>> wrote:
>>>>>
>>>>> > Hi All,
>>>>> >
>>>>> > While working on making the Join operator fault-tolerant, we
>>>>> realized the
>>>>> > need of a fault-tolerant Cache in Malhar library.
>>>>> >
>>>>> > This cache is useful for any operator which is state-full and stores
>>>>> > key/values for a very long period (more than an hour).
>>>>> >
>>>>> > The problem with just having a non-transient HashMap for the cache
>>>>> is that
>>>>> > over a period of time this state will become so large that
>>>>> checkpointing it
>>>>> > will be very costly and will cause bigger issues.
>>>>> >
>>>>> > In order to address this we need to checkpoint the state
>>>>> iteratively, i.e.,
>>>>> > save the difference in state at every application window.
>>>>> >
>>>>> > This brings forward the following broad requirements for the cache:
>>>>> > 1. The cache needs to have a max size and is backed by a filesystem.
>>>>> >
>>>>> > 2. When this threshold is reached, then adding more data to it
>>>>> should evict
>>>>> > older entries from memory.
>>>>> >
>>>>> > 3. To minimize cache misses, a block of data is loaded in memory.
>>>>> >
>>>>> > 4. A block or bucket to which a key belongs is provided by the user
>>>>> > (operator in this case) as the information about closeness in keys
>>>>> (that
>>>>> > can potentially reduce future misses) is not known to the cache but
>>>>> to the
>>>>> > user.
>>>>> >
>>>>> > 5. lazy load the keys in case of operator failure
>>>>> >
>>>>> > 6. To offset the cost of loading a block of keys when there is a
>>>>> miss,
>>>>> > loading can be done asynchronously with a callback that indicates
>>>>> when the
>>>>> > key is available. This allows the operator to process other keys
>>>>> which are
>>>>> > in memory.
>>>>> >
>>>>> > 7. data that is spilled over needs to be purged when it is not needed
>>>>> > anymore.
>>>>> >
>>>>> >
>>>>> > In past we solved this problem with BucketManager which is not in
>>>>> open
>>>>> > source now and also there were some limitations with the bucket api
>>>>> - the
>>>>> > biggest one is that it doesn't allow to save multiple values for a
>>>>> key.
>>>>> >
>>>>> > My plan is to create a similar solution as BucketManager in Malhar
>>>>> with
>>>>> > improved api.
>>>>> > Also save the data on hdfs in TFile which provides better
>>>>> performance when
>>>>> > saving key/values.
>>>>> >
>>>>> > Thanks,
>>>>> > Chandni
>>>>> >
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Refactoring of IdempotentStorageManager

Posted by Chandni Singh <ch...@datatorrent.com>.
Hi Tushar,

We need to move WAL from HDHT to Malhar/lib. The recent changes that you
are making makes it generic and I mentioned there how it can be used
internally for Idempotency.

Since you have worked mostly on the HDHT WAL, it will be great if you can
move it to Malhar/lib preserving the history and make necessary changes to
HDHT. There is already a ticket for it
https://malhar.atlassian.net/browse/APEX-99

Let me know if you will like to take this up.

Thanks,
Chandni

On Tue, Nov 17, 2015 at 9:57 PM, Tushar Gosavi <tu...@datatorrent.com>
wrote:

> Hi Chandni,
>
> Let me know if you need any help to extend HDHT WAL for this purpose. The
> current functionality supports writing objects sequentially to a file and
> reading sequentially from file. The serialization and deserialization needs
> to be handled by upper level code.
>
> Regards,
> - Tushar.
>
>
> On Wed, Nov 18, 2015 at 4:20 AM, Chandni Singh <ch...@datatorrent.com>
> wrote:
>
> > Hi,
> >
> > As part of creating a ManagedState, it was brought up that
> > IdempotentStorageManager needs to be re-factored:
> > 1. Re-name (have started a discussion about it in a separate thread).
> >
> > 2. It should be a layer over Write-ahead-log (WAL) abstraction which was
> > created in HDHT.
> >
> > This discussion is about 2nd point.
> >
> > Currently IdempotentStorageManager is an abstraction above StorageAgent
> > (which is in Apex core).
> > IMO this doesn't need to change. We have integrated
> > IdempotentStorageManager with various input/output operators in Malhar
> lib
> > and this abstraction works well.
> >
> > However the change that I think we need to make (which could be later) is
> > that IdempotentStorageManager.FSIdempotentStorageManager can use
> (contain)
> > WAL to write state to files.
> >
> > Advantages of this approach:
> > 1. Operators will not be needed to change because api of
> > IdempotentStorageManager doesn't change. There are quite a few of them.
> > 2. Parallel work is going on HDHT WAL which currently makes it very
> > difficult to move stuff retaining attribution.
> > 3. Once WAL abstraction is ready, FSIdempotentStorageManager can use it
> > again without affecting the operators.
> > 4. This is in-line with iterative development :)
> >
> > Chandni
> >
> > On Fri, Nov 13, 2015 at 7:32 PM, Chandni Singh <ch...@datatorrent.com>
> > wrote:
> >
> > > Let me know if anyone want to collaborate with me on this.
> > >
> > > Thanks,
> > > Chandni
> > >
> > > On Tue, Nov 10, 2015 at 6:18 PM, Chandni Singh <
> chandni@datatorrent.com>
> > > wrote:
> > >
> > >> Have added some more details about a Bucket in the document. Have a
> > look.
> > >>
> > >> On Sun, Nov 8, 2015 at 10:37 PM, Chandni Singh <
> chandni@datatorrent.com
> > >
> > >> wrote:
> > >>
> > >>> Forgot to attach the link.
> > >>>
> > >>>
> >
> https://docs.google.com/document/d/1gRWN9ufKSZSZD0N-pthlhpC9TZ8KwJ6hJlAX6nxl5f8/edit#heading=h.wlc0p58uzygb
> > >>>
> > >>>
> > >>> On Sun, Nov 8, 2015 at 10:36 PM, Chandni Singh <
> > chandni@datatorrent.com>
> > >>> wrote:
> > >>>
> > >>>> Hi,
> > >>>> This contains the overview of large state management.
> > >>>> Some parts need more description which I am working on but please
> free
> > >>>> to go through it and any feedback is appreciated.
> > >>>>
> > >>>> Thanks,
> > >>>> Chandni
> > >>>>
> > >>>>
> > >>>> On Tue, Oct 20, 2015 at 8:31 AM, Pramod Immaneni <
> > >>>> pramod@datatorrent.com> wrote:
> > >>>>
> > >>>>> This is a much needed component Chandni.
> > >>>>>
> > >>>>> The API for the cache will be important as users will be able to
> > plugin
> > >>>>> different implementations in future like those based off of popular
> > >>>>> distributed in-memory caches. Ehcache is a popular cache mechanism
> > and
> > >>>>> API
> > >>>>> that comes to bind. It comes bundled with a non-distributed
> > >>>>> implementation
> > >>>>> but there are commercial distributed implementations of it as well
> > like
> > >>>>> BigMemory.
> > >>>>>
> > >>>>> Given our needs for fault tolerance we may not be able to adopt the
> > >>>>> ehcache
> > >>>>> API as is but an extension of it might work. We would still
> provide a
> > >>>>> default implementation but going off of a well recognized API will
> > >>>>> facilitate development of other implementations in future based off
> > of
> > >>>>> popular implementations already available. We will need to
> > investigate
> > >>>>> if
> > >>>>> we can use the API as is or with relatively straightforward
> > extensions
> > >>>>> which will be a positive for using it. But if the API turns out to
> be
> > >>>>> significantly deviating from what we need then that would be a
> > >>>>> negative.
> > >>>>>
> > >>>>> Also it would be great if we could support an iterator to scan all
> > the
> > >>>>> keys, lazy loading as needed, since this need comes up from time to
> > >>>>> time in
> > >>>>> different scenarios such as change data capture calculations.
> > >>>>>
> > >>>>> Thanks.
> > >>>>>
> > >>>>> On Mon, Oct 19, 2015 at 9:10 PM, Chandni Singh <
> > >>>>> chandni@datatorrent.com>
> > >>>>> wrote:
> > >>>>>
> > >>>>> > Hi All,
> > >>>>> >
> > >>>>> > While working on making the Join operator fault-tolerant, we
> > >>>>> realized the
> > >>>>> > need of a fault-tolerant Cache in Malhar library.
> > >>>>> >
> > >>>>> > This cache is useful for any operator which is state-full and
> > stores
> > >>>>> > key/values for a very long period (more than an hour).
> > >>>>> >
> > >>>>> > The problem with just having a non-transient HashMap for the
> cache
> > >>>>> is that
> > >>>>> > over a period of time this state will become so large that
> > >>>>> checkpointing it
> > >>>>> > will be very costly and will cause bigger issues.
> > >>>>> >
> > >>>>> > In order to address this we need to checkpoint the state
> > >>>>> iteratively, i.e.,
> > >>>>> > save the difference in state at every application window.
> > >>>>> >
> > >>>>> > This brings forward the following broad requirements for the
> cache:
> > >>>>> > 1. The cache needs to have a max size and is backed by a
> > filesystem.
> > >>>>> >
> > >>>>> > 2. When this threshold is reached, then adding more data to it
> > >>>>> should evict
> > >>>>> > older entries from memory.
> > >>>>> >
> > >>>>> > 3. To minimize cache misses, a block of data is loaded in memory.
> > >>>>> >
> > >>>>> > 4. A block or bucket to which a key belongs is provided by the
> user
> > >>>>> > (operator in this case) as the information about closeness in
> keys
> > >>>>> (that
> > >>>>> > can potentially reduce future misses) is not known to the cache
> but
> > >>>>> to the
> > >>>>> > user.
> > >>>>> >
> > >>>>> > 5. lazy load the keys in case of operator failure
> > >>>>> >
> > >>>>> > 6. To offset the cost of loading a block of keys when there is a
> > >>>>> miss,
> > >>>>> > loading can be done asynchronously with a callback that indicates
> > >>>>> when the
> > >>>>> > key is available. This allows the operator to process other keys
> > >>>>> which are
> > >>>>> > in memory.
> > >>>>> >
> > >>>>> > 7. data that is spilled over needs to be purged when it is not
> > needed
> > >>>>> > anymore.
> > >>>>> >
> > >>>>> >
> > >>>>> > In past we solved this problem with BucketManager which is not in
> > >>>>> open
> > >>>>> > source now and also there were some limitations with the bucket
> api
> > >>>>> - the
> > >>>>> > biggest one is that it doesn't allow to save multiple values for
> a
> > >>>>> key.
> > >>>>> >
> > >>>>> > My plan is to create a similar solution as BucketManager in
> Malhar
> > >>>>> with
> > >>>>> > improved api.
> > >>>>> > Also save the data on hdfs in TFile which provides better
> > >>>>> performance when
> > >>>>> > saving key/values.
> > >>>>> >
> > >>>>> > Thanks,
> > >>>>> > Chandni
> > >>>>> >
> > >>>>>
> > >>>>
> > >>>>
> > >>>
> > >>
> > >
> >
>

Re: Refactoring of IdempotentStorageManager

Posted by Tushar Gosavi <tu...@datatorrent.com>.
Hi Chandni,

Let me know if you need any help to extend HDHT WAL for this purpose. The
current functionality supports writing objects sequentially to a file and
reading sequentially from file. The serialization and deserialization needs
to be handled by upper level code.

Regards,
- Tushar.


On Wed, Nov 18, 2015 at 4:20 AM, Chandni Singh <ch...@datatorrent.com>
wrote:

> Hi,
>
> As part of creating a ManagedState, it was brought up that
> IdempotentStorageManager needs to be re-factored:
> 1. Re-name (have started a discussion about it in a separate thread).
>
> 2. It should be a layer over Write-ahead-log (WAL) abstraction which was
> created in HDHT.
>
> This discussion is about 2nd point.
>
> Currently IdempotentStorageManager is an abstraction above StorageAgent
> (which is in Apex core).
> IMO this doesn't need to change. We have integrated
> IdempotentStorageManager with various input/output operators in Malhar lib
> and this abstraction works well.
>
> However the change that I think we need to make (which could be later) is
> that IdempotentStorageManager.FSIdempotentStorageManager can use (contain)
> WAL to write state to files.
>
> Advantages of this approach:
> 1. Operators will not be needed to change because api of
> IdempotentStorageManager doesn't change. There are quite a few of them.
> 2. Parallel work is going on HDHT WAL which currently makes it very
> difficult to move stuff retaining attribution.
> 3. Once WAL abstraction is ready, FSIdempotentStorageManager can use it
> again without affecting the operators.
> 4. This is in-line with iterative development :)
>
> Chandni
>
> On Fri, Nov 13, 2015 at 7:32 PM, Chandni Singh <ch...@datatorrent.com>
> wrote:
>
> > Let me know if anyone want to collaborate with me on this.
> >
> > Thanks,
> > Chandni
> >
> > On Tue, Nov 10, 2015 at 6:18 PM, Chandni Singh <ch...@datatorrent.com>
> > wrote:
> >
> >> Have added some more details about a Bucket in the document. Have a
> look.
> >>
> >> On Sun, Nov 8, 2015 at 10:37 PM, Chandni Singh <chandni@datatorrent.com
> >
> >> wrote:
> >>
> >>> Forgot to attach the link.
> >>>
> >>>
> https://docs.google.com/document/d/1gRWN9ufKSZSZD0N-pthlhpC9TZ8KwJ6hJlAX6nxl5f8/edit#heading=h.wlc0p58uzygb
> >>>
> >>>
> >>> On Sun, Nov 8, 2015 at 10:36 PM, Chandni Singh <
> chandni@datatorrent.com>
> >>> wrote:
> >>>
> >>>> Hi,
> >>>> This contains the overview of large state management.
> >>>> Some parts need more description which I am working on but please free
> >>>> to go through it and any feedback is appreciated.
> >>>>
> >>>> Thanks,
> >>>> Chandni
> >>>>
> >>>>
> >>>> On Tue, Oct 20, 2015 at 8:31 AM, Pramod Immaneni <
> >>>> pramod@datatorrent.com> wrote:
> >>>>
> >>>>> This is a much needed component Chandni.
> >>>>>
> >>>>> The API for the cache will be important as users will be able to
> plugin
> >>>>> different implementations in future like those based off of popular
> >>>>> distributed in-memory caches. Ehcache is a popular cache mechanism
> and
> >>>>> API
> >>>>> that comes to bind. It comes bundled with a non-distributed
> >>>>> implementation
> >>>>> but there are commercial distributed implementations of it as well
> like
> >>>>> BigMemory.
> >>>>>
> >>>>> Given our needs for fault tolerance we may not be able to adopt the
> >>>>> ehcache
> >>>>> API as is but an extension of it might work. We would still provide a
> >>>>> default implementation but going off of a well recognized API will
> >>>>> facilitate development of other implementations in future based off
> of
> >>>>> popular implementations already available. We will need to
> investigate
> >>>>> if
> >>>>> we can use the API as is or with relatively straightforward
> extensions
> >>>>> which will be a positive for using it. But if the API turns out to be
> >>>>> significantly deviating from what we need then that would be a
> >>>>> negative.
> >>>>>
> >>>>> Also it would be great if we could support an iterator to scan all
> the
> >>>>> keys, lazy loading as needed, since this need comes up from time to
> >>>>> time in
> >>>>> different scenarios such as change data capture calculations.
> >>>>>
> >>>>> Thanks.
> >>>>>
> >>>>> On Mon, Oct 19, 2015 at 9:10 PM, Chandni Singh <
> >>>>> chandni@datatorrent.com>
> >>>>> wrote:
> >>>>>
> >>>>> > Hi All,
> >>>>> >
> >>>>> > While working on making the Join operator fault-tolerant, we
> >>>>> realized the
> >>>>> > need of a fault-tolerant Cache in Malhar library.
> >>>>> >
> >>>>> > This cache is useful for any operator which is state-full and
> stores
> >>>>> > key/values for a very long period (more than an hour).
> >>>>> >
> >>>>> > The problem with just having a non-transient HashMap for the cache
> >>>>> is that
> >>>>> > over a period of time this state will become so large that
> >>>>> checkpointing it
> >>>>> > will be very costly and will cause bigger issues.
> >>>>> >
> >>>>> > In order to address this we need to checkpoint the state
> >>>>> iteratively, i.e.,
> >>>>> > save the difference in state at every application window.
> >>>>> >
> >>>>> > This brings forward the following broad requirements for the cache:
> >>>>> > 1. The cache needs to have a max size and is backed by a
> filesystem.
> >>>>> >
> >>>>> > 2. When this threshold is reached, then adding more data to it
> >>>>> should evict
> >>>>> > older entries from memory.
> >>>>> >
> >>>>> > 3. To minimize cache misses, a block of data is loaded in memory.
> >>>>> >
> >>>>> > 4. A block or bucket to which a key belongs is provided by the user
> >>>>> > (operator in this case) as the information about closeness in keys
> >>>>> (that
> >>>>> > can potentially reduce future misses) is not known to the cache but
> >>>>> to the
> >>>>> > user.
> >>>>> >
> >>>>> > 5. lazy load the keys in case of operator failure
> >>>>> >
> >>>>> > 6. To offset the cost of loading a block of keys when there is a
> >>>>> miss,
> >>>>> > loading can be done asynchronously with a callback that indicates
> >>>>> when the
> >>>>> > key is available. This allows the operator to process other keys
> >>>>> which are
> >>>>> > in memory.
> >>>>> >
> >>>>> > 7. data that is spilled over needs to be purged when it is not
> needed
> >>>>> > anymore.
> >>>>> >
> >>>>> >
> >>>>> > In past we solved this problem with BucketManager which is not in
> >>>>> open
> >>>>> > source now and also there were some limitations with the bucket api
> >>>>> - the
> >>>>> > biggest one is that it doesn't allow to save multiple values for a
> >>>>> key.
> >>>>> >
> >>>>> > My plan is to create a similar solution as BucketManager in Malhar
> >>>>> with
> >>>>> > improved api.
> >>>>> > Also save the data on hdfs in TFile which provides better
> >>>>> performance when
> >>>>> > saving key/values.
> >>>>> >
> >>>>> > Thanks,
> >>>>> > Chandni
> >>>>> >
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >
>