You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Ashish Tadose <as...@gmail.com> on 2015/12/02 17:51:47 UTC

Operator checkpointing in distributed in-memory store

Hi All,

Currently Apex engine provides operator checkpointing in Hdfs ( with Hdfs
backed StorageAgents i.e. FSStorageAgent & AsyncFSStorageAgent )

We have observed that for applications having large number of operator
instances, hdfs checkpointing introduces latency in DAG which degrades
overall application performance.
To resolve this we had to review all operators in DAG and had to make few
operators stateless.

As operator check-pointing is critical functionality of Apex streaming
platform to ensure fault tolerant behavior, platform should also provide
alternate StorageAgents which will work seamlessly with large applications
that requires Exactly once semantics.

HDFS read/write latency is limited and doesn't improve beyond certain point
because of disk io & staging writes. Having alternate strategy to this
check-pointing in fault tolerant distributed in-memory grid would ensure
application stability and performance is not impacted.

I have developed a in-memory storage agent which I would like to contribute
as alternate StorageAgent for checkpointing.

Thanks,
Ashish

Re: Operator checkpointing in distributed in-memory store

Posted by Ashish Tadose <as...@gmail.com>.
Hi Thomas,

I have created below JIRA and added pull request for code review.
https://issues.apache.org/jira/browse/APEXCORE-283

We have also developed input & output operators for Geode which we will
contributing soon.

Thx,
Ashish


On Sat, Dec 5, 2015 at 12:02 AM, Thomas Weise <th...@datatorrent.com>
wrote:

> Hi Ashish,
>
> Nice, looking forward to see the support for Geode! Have you created any
> JIRAs for your work yet?
>
> Just in case you have not seen it yet, here are the contributor guidelines:
> http://apex.incubator.apache.org/contributing.html
>
> Thanks,
> Thomas
>
> On Fri, Dec 4, 2015 at 9:49 AM, Ashish Tadose <as...@gmail.com>
> wrote:
>
> > Gaurav, Sandesh
> >
> > PFB my comments in *bold*
> >
> > 1. Are there standard APIs for distributed In-Memory stores or is this
> > implementation specific to one particular tool?
> > *I have developed concrete implementation with Apache Geode -
> > http://geode.incubator.apache.org/ <http://geode.incubator.apache.org/>*
> > *However for this feature contribution I am adding KeyValue store
> interface
> > and abstract implementation to plug in any KeyValue store as storage
> > agent. *
> >
> > 2. Will In-Memory Store compete with DataTorrent Apps for cluster
> resources
> > (memory/cpu)?
> > *Probable not, In-memory store would be separate managed cluster which
> may
> > not part of yarn env. *
> >
> > 3. What is the purging policy? Who is responsible for cleaning up the
> > resources for completed/failed/aborted applications? This becomes
> important
> > when you want to launch an Application using previous Application Id
> > *In-memory storage would support delete checkpoint which platform calls
> > periodically d**uring application lifetime. *
> > *Purging the checkpoints of older applications will be taken care by
> > application developer or admin who is managing the in-memory cluster,
> same
> > is the case with HDFS storage agents where user have to manually delete
> old
> > apps and checkpoints data.*
> >
> > 4 What all in-memory store did you evaluate?  Are they YARN compatible?
> > *I have concrete implementation of Geode storage agent which I would be
> > contributing along with this feature.*
> >
> > Thanks,
> > Ashish
> >
> >
> > On Fri, Dec 4, 2015 at 12:45 AM, Sandesh Hegde <sa...@datatorrent.com>
> > wrote:
> >
> > > Ashish,
> > >
> > > Two more questions for you,
> > > What all in-memory store did you evaluate?  Are they YARN compatible?
> > >
> > > Thank you for your contribution.
> > >
> > > Sandesh
> > >
> > > On Wed, Dec 2, 2015 at 10:53 AM Gaurav Gupta <ga...@datatorrent.com>
> > > wrote:
> > >
> > > > Ashish,
> > > >
> > > > I have couple of questions
> > > > 1. Are there standard APIs for distributed In-Memory stores or is
> this
> > > > implementation specific to one particular tool?
> > > > 2. Will In-Memory Store compete with DataTorrent Apps for cluster
> > > > resources (memory/cpu)?
> > > > 3. What is the purging policy? Who is responsible for cleaning up the
> > > > resources for completed/failed/aborted applications? This becomes
> > > important
> > > > when you want to launch an Application using previous Application Id
> > > >
> > > > Thanks
> > > > - Gaurav
> > > >
> > > > > On Dec 2, 2015, at 10:07 AM, Ashish Tadose <ashishtadose@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > Thanks Gaurav,
> > > > >
> > > > > I have finished baseline implementations of StorageAgent and also
> > > tested
> > > > it
> > > > > with demo applications by explicitly specifying it in DAG
> > configuration
> > > > as
> > > > > below and it works fine.
> > > > >
> > > > > dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
> > > > >
> > > > > I also had to make some changes to StramClient to pass additional
> > > > > information such as applicationId as it doesn't passes currently.
> > > > >
> > > > > I am going to create JIRA task for this feature and will document
> > > design
> > > > &
> > > > > implementation strategy there.
> > > > >
> > > > > Thx,
> > > > > Asish
> > > > >
> > > > >
> > > > > On Wed, Dec 2, 2015 at 11:26 PM, Gaurav Gupta <
> > gaurav@datatorrent.com>
> > > > > wrote:
> > > > >
> > > > >> Just to add you can plugin your storage agent using attribute
> > > > >> STORAGE_AGENT (
> > > > >>
> > > >
> > >
> >
> https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Context.OperatorContext.html#STORAGE_AGENT
> > > > >> )
> > > > >>
> > > > >> Thanks
> > > > >> - Gaurav
> > > > >>
> > > > >>> On Dec 2, 2015, at 9:51 AM, Gaurav Gupta <gaurav@datatorrent.com
> >
> > > > wrote:
> > > > >>>
> > > > >>> Ashish,
> > > > >>>
> > > > >>> You are right that Exactly once semantics can’t be achieved
> through
> > > > >> Async FS write.
> > > > >>> Did you try new StorageAgent with your Application? If yes do you
> > > have
> > > > >> any numbers to compare?
> > > > >>>
> > > > >>> Thanks
> > > > >>> - Gaurav
> > > > >>>
> > > > >>>> On Dec 2, 2015, at 9:33 AM, Ashish Tadose <
> ashishtadose@gmail.com
> > > > >> <ma...@gmail.com>> wrote:
> > > > >>>>
> > > > >>>> Application uses large number of in-memory dimension store
> > > partitions
> > > > to
> > > > >>>> hold high cardinally aggregated data and also many intermediate
> > > > >> operators
> > > > >>>> keep cache data for reference look ups which are not-transient.
> > > > >>>>
> > > > >>>> Total application partitions were more than 1000 which makes lot
> > of
> > > > >>>> operator to checkpoint and in term lot of frequent Hdfs write,
> > > rename
> > > > &
> > > > >>>> delete operations which became bottleneck.
> > > > >>>>
> > > > >>>> Application requires Exactly once semantics with idempotent
> > > operators
> > > > >> which
> > > > >>>> I suppose can not be achieved through Async fs writes, please
> > > correct
> > > > >> me If
> > > > >>>> I'm wrong here.
> > > > >>>>
> > > > >>>> Also application computes streaming aggregations of high
> > cardinality
> > > > >>>> incoming data streams and reference caches are update frequently
> > so
> > > > not
> > > > >>>> sure how much incremental checkpointing will help here.
> > > > >>>>
> > > > >>>> Despite this specific application I strongly think it would be
> > good
> > > to
> > > > >> have
> > > > >>>> StorageAgent backed by distributed in-memory store as
> alternative
> > in
> > > > >>>> platform.
> > > > >>>>
> > > > >>>> Ashish
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> On Wed, Dec 2, 2015 at 10:35 PM, Munagala Ramanath <
> > > > ram@datatorrent.com
> > > > >> <ma...@datatorrent.com>>
> > > > >>>> wrote:
> > > > >>>>
> > > > >>>>> Ashish,
> > > > >>>>>
> > > > >>>>> In the current release, the HDFS writes are asynchronous so I'm
> > > > >> wondering
> > > > >>>>> if
> > > > >>>>> you could elaborate on how much latency you are observing both
> > with
> > > > and
> > > > >>>>> without
> > > > >>>>> checkpointing (i.e. after your changes to make operators
> > > stateless).
> > > > >>>>>
> > > > >>>>> Also any information on how much non-transient data is being
> > > > >> checkpointed
> > > > >>>>> in
> > > > >>>>> each operator would also be useful. There is an effort under
> way
> > to
> > > > >>>>> implement
> > > > >>>>> incremental checkpointing which should improve things when
> there
> > > is a
> > > > >> lot
> > > > >>>>> state
> > > > >>>>> but very little that changes from window to window.
> > > > >>>>>
> > > > >>>>> Ram
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> On Wed, Dec 2, 2015 at 8:51 AM, Ashish Tadose <
> > > > ashishtadose@gmail.com
> > > > >> <ma...@gmail.com>>
> > > > >>>>> wrote:
> > > > >>>>>
> > > > >>>>>> Hi All,
> > > > >>>>>>
> > > > >>>>>> Currently Apex engine provides operator checkpointing in Hdfs
> (
> > > with
> > > > >> Hdfs
> > > > >>>>>> backed StorageAgents i.e. FSStorageAgent &
> AsyncFSStorageAgent )
> > > > >>>>>>
> > > > >>>>>> We have observed that for applications having large number of
> > > > operator
> > > > >>>>>> instances, hdfs checkpointing introduces latency in DAG which
> > > > degrades
> > > > >>>>>> overall application performance.
> > > > >>>>>> To resolve this we had to review all operators in DAG and had
> to
> > > > make
> > > > >> few
> > > > >>>>>> operators stateless.
> > > > >>>>>>
> > > > >>>>>> As operator check-pointing is critical functionality of Apex
> > > > streaming
> > > > >>>>>> platform to ensure fault tolerant behavior, platform should
> also
> > > > >> provide
> > > > >>>>>> alternate StorageAgents which will work seamlessly with large
> > > > >>>>> applications
> > > > >>>>>> that requires Exactly once semantics.
> > > > >>>>>>
> > > > >>>>>> HDFS read/write latency is limited and doesn't improve beyond
> > > > certain
> > > > >>>>> point
> > > > >>>>>> because of disk io & staging writes. Having alternate strategy
> > to
> > > > this
> > > > >>>>>> check-pointing in fault tolerant distributed in-memory grid
> > would
> > > > >> ensure
> > > > >>>>>> application stability and performance is not impacted.
> > > > >>>>>>
> > > > >>>>>> I have developed a in-memory storage agent which I would like
> to
> > > > >>>>> contribute
> > > > >>>>>> as alternate StorageAgent for checkpointing.
> > > > >>>>>>
> > > > >>>>>> Thanks,
> > > > >>>>>> Ashish
> > > > >>>>>>
> > > > >>>>>
> > > > >>>
> > > > >>
> > > > >>
> > > >
> > > >
> > >
> >
>

Re: Operator checkpointing in distributed in-memory store

Posted by Thomas Weise <th...@datatorrent.com>.
Hi Ashish,

Nice, looking forward to see the support for Geode! Have you created any
JIRAs for your work yet?

Just in case you have not seen it yet, here are the contributor guidelines:
http://apex.incubator.apache.org/contributing.html

Thanks,
Thomas

On Fri, Dec 4, 2015 at 9:49 AM, Ashish Tadose <as...@gmail.com>
wrote:

> Gaurav, Sandesh
>
> PFB my comments in *bold*
>
> 1. Are there standard APIs for distributed In-Memory stores or is this
> implementation specific to one particular tool?
> *I have developed concrete implementation with Apache Geode -
> http://geode.incubator.apache.org/ <http://geode.incubator.apache.org/>*
> *However for this feature contribution I am adding KeyValue store interface
> and abstract implementation to plug in any KeyValue store as storage
> agent. *
>
> 2. Will In-Memory Store compete with DataTorrent Apps for cluster resources
> (memory/cpu)?
> *Probable not, In-memory store would be separate managed cluster which may
> not part of yarn env. *
>
> 3. What is the purging policy? Who is responsible for cleaning up the
> resources for completed/failed/aborted applications? This becomes important
> when you want to launch an Application using previous Application Id
> *In-memory storage would support delete checkpoint which platform calls
> periodically d**uring application lifetime. *
> *Purging the checkpoints of older applications will be taken care by
> application developer or admin who is managing the in-memory cluster, same
> is the case with HDFS storage agents where user have to manually delete old
> apps and checkpoints data.*
>
> 4 What all in-memory store did you evaluate?  Are they YARN compatible?
> *I have concrete implementation of Geode storage agent which I would be
> contributing along with this feature.*
>
> Thanks,
> Ashish
>
>
> On Fri, Dec 4, 2015 at 12:45 AM, Sandesh Hegde <sa...@datatorrent.com>
> wrote:
>
> > Ashish,
> >
> > Two more questions for you,
> > What all in-memory store did you evaluate?  Are they YARN compatible?
> >
> > Thank you for your contribution.
> >
> > Sandesh
> >
> > On Wed, Dec 2, 2015 at 10:53 AM Gaurav Gupta <ga...@datatorrent.com>
> > wrote:
> >
> > > Ashish,
> > >
> > > I have couple of questions
> > > 1. Are there standard APIs for distributed In-Memory stores or is this
> > > implementation specific to one particular tool?
> > > 2. Will In-Memory Store compete with DataTorrent Apps for cluster
> > > resources (memory/cpu)?
> > > 3. What is the purging policy? Who is responsible for cleaning up the
> > > resources for completed/failed/aborted applications? This becomes
> > important
> > > when you want to launch an Application using previous Application Id
> > >
> > > Thanks
> > > - Gaurav
> > >
> > > > On Dec 2, 2015, at 10:07 AM, Ashish Tadose <as...@gmail.com>
> > > wrote:
> > > >
> > > > Thanks Gaurav,
> > > >
> > > > I have finished baseline implementations of StorageAgent and also
> > tested
> > > it
> > > > with demo applications by explicitly specifying it in DAG
> configuration
> > > as
> > > > below and it works fine.
> > > >
> > > > dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
> > > >
> > > > I also had to make some changes to StramClient to pass additional
> > > > information such as applicationId as it doesn't passes currently.
> > > >
> > > > I am going to create JIRA task for this feature and will document
> > design
> > > &
> > > > implementation strategy there.
> > > >
> > > > Thx,
> > > > Asish
> > > >
> > > >
> > > > On Wed, Dec 2, 2015 at 11:26 PM, Gaurav Gupta <
> gaurav@datatorrent.com>
> > > > wrote:
> > > >
> > > >> Just to add you can plugin your storage agent using attribute
> > > >> STORAGE_AGENT (
> > > >>
> > >
> >
> https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Context.OperatorContext.html#STORAGE_AGENT
> > > >> )
> > > >>
> > > >> Thanks
> > > >> - Gaurav
> > > >>
> > > >>> On Dec 2, 2015, at 9:51 AM, Gaurav Gupta <ga...@datatorrent.com>
> > > wrote:
> > > >>>
> > > >>> Ashish,
> > > >>>
> > > >>> You are right that Exactly once semantics can’t be achieved through
> > > >> Async FS write.
> > > >>> Did you try new StorageAgent with your Application? If yes do you
> > have
> > > >> any numbers to compare?
> > > >>>
> > > >>> Thanks
> > > >>> - Gaurav
> > > >>>
> > > >>>> On Dec 2, 2015, at 9:33 AM, Ashish Tadose <ashishtadose@gmail.com
> > > >> <ma...@gmail.com>> wrote:
> > > >>>>
> > > >>>> Application uses large number of in-memory dimension store
> > partitions
> > > to
> > > >>>> hold high cardinally aggregated data and also many intermediate
> > > >> operators
> > > >>>> keep cache data for reference look ups which are not-transient.
> > > >>>>
> > > >>>> Total application partitions were more than 1000 which makes lot
> of
> > > >>>> operator to checkpoint and in term lot of frequent Hdfs write,
> > rename
> > > &
> > > >>>> delete operations which became bottleneck.
> > > >>>>
> > > >>>> Application requires Exactly once semantics with idempotent
> > operators
> > > >> which
> > > >>>> I suppose can not be achieved through Async fs writes, please
> > correct
> > > >> me If
> > > >>>> I'm wrong here.
> > > >>>>
> > > >>>> Also application computes streaming aggregations of high
> cardinality
> > > >>>> incoming data streams and reference caches are update frequently
> so
> > > not
> > > >>>> sure how much incremental checkpointing will help here.
> > > >>>>
> > > >>>> Despite this specific application I strongly think it would be
> good
> > to
> > > >> have
> > > >>>> StorageAgent backed by distributed in-memory store as alternative
> in
> > > >>>> platform.
> > > >>>>
> > > >>>> Ashish
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> On Wed, Dec 2, 2015 at 10:35 PM, Munagala Ramanath <
> > > ram@datatorrent.com
> > > >> <ma...@datatorrent.com>>
> > > >>>> wrote:
> > > >>>>
> > > >>>>> Ashish,
> > > >>>>>
> > > >>>>> In the current release, the HDFS writes are asynchronous so I'm
> > > >> wondering
> > > >>>>> if
> > > >>>>> you could elaborate on how much latency you are observing both
> with
> > > and
> > > >>>>> without
> > > >>>>> checkpointing (i.e. after your changes to make operators
> > stateless).
> > > >>>>>
> > > >>>>> Also any information on how much non-transient data is being
> > > >> checkpointed
> > > >>>>> in
> > > >>>>> each operator would also be useful. There is an effort under way
> to
> > > >>>>> implement
> > > >>>>> incremental checkpointing which should improve things when there
> > is a
> > > >> lot
> > > >>>>> state
> > > >>>>> but very little that changes from window to window.
> > > >>>>>
> > > >>>>> Ram
> > > >>>>>
> > > >>>>>
> > > >>>>> On Wed, Dec 2, 2015 at 8:51 AM, Ashish Tadose <
> > > ashishtadose@gmail.com
> > > >> <ma...@gmail.com>>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>>> Hi All,
> > > >>>>>>
> > > >>>>>> Currently Apex engine provides operator checkpointing in Hdfs (
> > with
> > > >> Hdfs
> > > >>>>>> backed StorageAgents i.e. FSStorageAgent & AsyncFSStorageAgent )
> > > >>>>>>
> > > >>>>>> We have observed that for applications having large number of
> > > operator
> > > >>>>>> instances, hdfs checkpointing introduces latency in DAG which
> > > degrades
> > > >>>>>> overall application performance.
> > > >>>>>> To resolve this we had to review all operators in DAG and had to
> > > make
> > > >> few
> > > >>>>>> operators stateless.
> > > >>>>>>
> > > >>>>>> As operator check-pointing is critical functionality of Apex
> > > streaming
> > > >>>>>> platform to ensure fault tolerant behavior, platform should also
> > > >> provide
> > > >>>>>> alternate StorageAgents which will work seamlessly with large
> > > >>>>> applications
> > > >>>>>> that requires Exactly once semantics.
> > > >>>>>>
> > > >>>>>> HDFS read/write latency is limited and doesn't improve beyond
> > > certain
> > > >>>>> point
> > > >>>>>> because of disk io & staging writes. Having alternate strategy
> to
> > > this
> > > >>>>>> check-pointing in fault tolerant distributed in-memory grid
> would
> > > >> ensure
> > > >>>>>> application stability and performance is not impacted.
> > > >>>>>>
> > > >>>>>> I have developed a in-memory storage agent which I would like to
> > > >>>>> contribute
> > > >>>>>> as alternate StorageAgent for checkpointing.
> > > >>>>>>
> > > >>>>>> Thanks,
> > > >>>>>> Ashish
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > > >>
> > >
> > >
> >
>

Re: Operator checkpointing in distributed in-memory store

Posted by Chandni Singh <ch...@datatorrent.com>.
Ashish,

This sounds very interesting and useful. Looking forward to the
implementation.

Chandni

On Fri, Dec 4, 2015 at 9:49 AM, Ashish Tadose <as...@gmail.com>
wrote:

> Gaurav, Sandesh
>
> PFB my comments in *bold*
>
> 1. Are there standard APIs for distributed In-Memory stores or is this
> implementation specific to one particular tool?
> *I have developed concrete implementation with Apache Geode -
> http://geode.incubator.apache.org/ <http://geode.incubator.apache.org/>*
> *However for this feature contribution I am adding KeyValue store interface
> and abstract implementation to plug in any KeyValue store as storage
> agent. *
>
> 2. Will In-Memory Store compete with DataTorrent Apps for cluster resources
> (memory/cpu)?
> *Probable not, In-memory store would be separate managed cluster which may
> not part of yarn env. *
>
> 3. What is the purging policy? Who is responsible for cleaning up the
> resources for completed/failed/aborted applications? This becomes important
> when you want to launch an Application using previous Application Id
> *In-memory storage would support delete checkpoint which platform calls
> periodically d**uring application lifetime. *
> *Purging the checkpoints of older applications will be taken care by
> application developer or admin who is managing the in-memory cluster, same
> is the case with HDFS storage agents where user have to manually delete old
> apps and checkpoints data.*
>
> 4 What all in-memory store did you evaluate?  Are they YARN compatible?
> *I have concrete implementation of Geode storage agent which I would be
> contributing along with this feature.*
>
> Thanks,
> Ashish
>
>
> On Fri, Dec 4, 2015 at 12:45 AM, Sandesh Hegde <sa...@datatorrent.com>
> wrote:
>
> > Ashish,
> >
> > Two more questions for you,
> > What all in-memory store did you evaluate?  Are they YARN compatible?
> >
> > Thank you for your contribution.
> >
> > Sandesh
> >
> > On Wed, Dec 2, 2015 at 10:53 AM Gaurav Gupta <ga...@datatorrent.com>
> > wrote:
> >
> > > Ashish,
> > >
> > > I have couple of questions
> > > 1. Are there standard APIs for distributed In-Memory stores or is this
> > > implementation specific to one particular tool?
> > > 2. Will In-Memory Store compete with DataTorrent Apps for cluster
> > > resources (memory/cpu)?
> > > 3. What is the purging policy? Who is responsible for cleaning up the
> > > resources for completed/failed/aborted applications? This becomes
> > important
> > > when you want to launch an Application using previous Application Id
> > >
> > > Thanks
> > > - Gaurav
> > >
> > > > On Dec 2, 2015, at 10:07 AM, Ashish Tadose <as...@gmail.com>
> > > wrote:
> > > >
> > > > Thanks Gaurav,
> > > >
> > > > I have finished baseline implementations of StorageAgent and also
> > tested
> > > it
> > > > with demo applications by explicitly specifying it in DAG
> configuration
> > > as
> > > > below and it works fine.
> > > >
> > > > dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
> > > >
> > > > I also had to make some changes to StramClient to pass additional
> > > > information such as applicationId as it doesn't passes currently.
> > > >
> > > > I am going to create JIRA task for this feature and will document
> > design
> > > &
> > > > implementation strategy there.
> > > >
> > > > Thx,
> > > > Asish
> > > >
> > > >
> > > > On Wed, Dec 2, 2015 at 11:26 PM, Gaurav Gupta <
> gaurav@datatorrent.com>
> > > > wrote:
> > > >
> > > >> Just to add you can plugin your storage agent using attribute
> > > >> STORAGE_AGENT (
> > > >>
> > >
> >
> https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Context.OperatorContext.html#STORAGE_AGENT
> > > >> )
> > > >>
> > > >> Thanks
> > > >> - Gaurav
> > > >>
> > > >>> On Dec 2, 2015, at 9:51 AM, Gaurav Gupta <ga...@datatorrent.com>
> > > wrote:
> > > >>>
> > > >>> Ashish,
> > > >>>
> > > >>> You are right that Exactly once semantics can’t be achieved through
> > > >> Async FS write.
> > > >>> Did you try new StorageAgent with your Application? If yes do you
> > have
> > > >> any numbers to compare?
> > > >>>
> > > >>> Thanks
> > > >>> - Gaurav
> > > >>>
> > > >>>> On Dec 2, 2015, at 9:33 AM, Ashish Tadose <ashishtadose@gmail.com
> > > >> <ma...@gmail.com>> wrote:
> > > >>>>
> > > >>>> Application uses large number of in-memory dimension store
> > partitions
> > > to
> > > >>>> hold high cardinally aggregated data and also many intermediate
> > > >> operators
> > > >>>> keep cache data for reference look ups which are not-transient.
> > > >>>>
> > > >>>> Total application partitions were more than 1000 which makes lot
> of
> > > >>>> operator to checkpoint and in term lot of frequent Hdfs write,
> > rename
> > > &
> > > >>>> delete operations which became bottleneck.
> > > >>>>
> > > >>>> Application requires Exactly once semantics with idempotent
> > operators
> > > >> which
> > > >>>> I suppose can not be achieved through Async fs writes, please
> > correct
> > > >> me If
> > > >>>> I'm wrong here.
> > > >>>>
> > > >>>> Also application computes streaming aggregations of high
> cardinality
> > > >>>> incoming data streams and reference caches are update frequently
> so
> > > not
> > > >>>> sure how much incremental checkpointing will help here.
> > > >>>>
> > > >>>> Despite this specific application I strongly think it would be
> good
> > to
> > > >> have
> > > >>>> StorageAgent backed by distributed in-memory store as alternative
> in
> > > >>>> platform.
> > > >>>>
> > > >>>> Ashish
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> On Wed, Dec 2, 2015 at 10:35 PM, Munagala Ramanath <
> > > ram@datatorrent.com
> > > >> <ma...@datatorrent.com>>
> > > >>>> wrote:
> > > >>>>
> > > >>>>> Ashish,
> > > >>>>>
> > > >>>>> In the current release, the HDFS writes are asynchronous so I'm
> > > >> wondering
> > > >>>>> if
> > > >>>>> you could elaborate on how much latency you are observing both
> with
> > > and
> > > >>>>> without
> > > >>>>> checkpointing (i.e. after your changes to make operators
> > stateless).
> > > >>>>>
> > > >>>>> Also any information on how much non-transient data is being
> > > >> checkpointed
> > > >>>>> in
> > > >>>>> each operator would also be useful. There is an effort under way
> to
> > > >>>>> implement
> > > >>>>> incremental checkpointing which should improve things when there
> > is a
> > > >> lot
> > > >>>>> state
> > > >>>>> but very little that changes from window to window.
> > > >>>>>
> > > >>>>> Ram
> > > >>>>>
> > > >>>>>
> > > >>>>> On Wed, Dec 2, 2015 at 8:51 AM, Ashish Tadose <
> > > ashishtadose@gmail.com
> > > >> <ma...@gmail.com>>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>>> Hi All,
> > > >>>>>>
> > > >>>>>> Currently Apex engine provides operator checkpointing in Hdfs (
> > with
> > > >> Hdfs
> > > >>>>>> backed StorageAgents i.e. FSStorageAgent & AsyncFSStorageAgent )
> > > >>>>>>
> > > >>>>>> We have observed that for applications having large number of
> > > operator
> > > >>>>>> instances, hdfs checkpointing introduces latency in DAG which
> > > degrades
> > > >>>>>> overall application performance.
> > > >>>>>> To resolve this we had to review all operators in DAG and had to
> > > make
> > > >> few
> > > >>>>>> operators stateless.
> > > >>>>>>
> > > >>>>>> As operator check-pointing is critical functionality of Apex
> > > streaming
> > > >>>>>> platform to ensure fault tolerant behavior, platform should also
> > > >> provide
> > > >>>>>> alternate StorageAgents which will work seamlessly with large
> > > >>>>> applications
> > > >>>>>> that requires Exactly once semantics.
> > > >>>>>>
> > > >>>>>> HDFS read/write latency is limited and doesn't improve beyond
> > > certain
> > > >>>>> point
> > > >>>>>> because of disk io & staging writes. Having alternate strategy
> to
> > > this
> > > >>>>>> check-pointing in fault tolerant distributed in-memory grid
> would
> > > >> ensure
> > > >>>>>> application stability and performance is not impacted.
> > > >>>>>>
> > > >>>>>> I have developed a in-memory storage agent which I would like to
> > > >>>>> contribute
> > > >>>>>> as alternate StorageAgent for checkpointing.
> > > >>>>>>
> > > >>>>>> Thanks,
> > > >>>>>> Ashish
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > > >>
> > >
> > >
> >
>

Re: Operator checkpointing in distributed in-memory store

Posted by Ashish Tadose <as...@gmail.com>.
Gaurav, Sandesh

PFB my comments in *bold*

1. Are there standard APIs for distributed In-Memory stores or is this
implementation specific to one particular tool?
*I have developed concrete implementation with Apache Geode -
http://geode.incubator.apache.org/ <http://geode.incubator.apache.org/>*
*However for this feature contribution I am adding KeyValue store interface
and abstract implementation to plug in any KeyValue store as storage
agent. *

2. Will In-Memory Store compete with DataTorrent Apps for cluster resources
(memory/cpu)?
*Probable not, In-memory store would be separate managed cluster which may
not part of yarn env. *

3. What is the purging policy? Who is responsible for cleaning up the
resources for completed/failed/aborted applications? This becomes important
when you want to launch an Application using previous Application Id
*In-memory storage would support delete checkpoint which platform calls
periodically d**uring application lifetime. *
*Purging the checkpoints of older applications will be taken care by
application developer or admin who is managing the in-memory cluster, same
is the case with HDFS storage agents where user have to manually delete old
apps and checkpoints data.*

4 What all in-memory store did you evaluate?  Are they YARN compatible?
*I have concrete implementation of Geode storage agent which I would be
contributing along with this feature.*

Thanks,
Ashish


On Fri, Dec 4, 2015 at 12:45 AM, Sandesh Hegde <sa...@datatorrent.com>
wrote:

> Ashish,
>
> Two more questions for you,
> What all in-memory store did you evaluate?  Are they YARN compatible?
>
> Thank you for your contribution.
>
> Sandesh
>
> On Wed, Dec 2, 2015 at 10:53 AM Gaurav Gupta <ga...@datatorrent.com>
> wrote:
>
> > Ashish,
> >
> > I have couple of questions
> > 1. Are there standard APIs for distributed In-Memory stores or is this
> > implementation specific to one particular tool?
> > 2. Will In-Memory Store compete with DataTorrent Apps for cluster
> > resources (memory/cpu)?
> > 3. What is the purging policy? Who is responsible for cleaning up the
> > resources for completed/failed/aborted applications? This becomes
> important
> > when you want to launch an Application using previous Application Id
> >
> > Thanks
> > - Gaurav
> >
> > > On Dec 2, 2015, at 10:07 AM, Ashish Tadose <as...@gmail.com>
> > wrote:
> > >
> > > Thanks Gaurav,
> > >
> > > I have finished baseline implementations of StorageAgent and also
> tested
> > it
> > > with demo applications by explicitly specifying it in DAG configuration
> > as
> > > below and it works fine.
> > >
> > > dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
> > >
> > > I also had to make some changes to StramClient to pass additional
> > > information such as applicationId as it doesn't passes currently.
> > >
> > > I am going to create JIRA task for this feature and will document
> design
> > &
> > > implementation strategy there.
> > >
> > > Thx,
> > > Asish
> > >
> > >
> > > On Wed, Dec 2, 2015 at 11:26 PM, Gaurav Gupta <ga...@datatorrent.com>
> > > wrote:
> > >
> > >> Just to add you can plugin your storage agent using attribute
> > >> STORAGE_AGENT (
> > >>
> >
> https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Context.OperatorContext.html#STORAGE_AGENT
> > >> )
> > >>
> > >> Thanks
> > >> - Gaurav
> > >>
> > >>> On Dec 2, 2015, at 9:51 AM, Gaurav Gupta <ga...@datatorrent.com>
> > wrote:
> > >>>
> > >>> Ashish,
> > >>>
> > >>> You are right that Exactly once semantics can’t be achieved through
> > >> Async FS write.
> > >>> Did you try new StorageAgent with your Application? If yes do you
> have
> > >> any numbers to compare?
> > >>>
> > >>> Thanks
> > >>> - Gaurav
> > >>>
> > >>>> On Dec 2, 2015, at 9:33 AM, Ashish Tadose <ashishtadose@gmail.com
> > >> <ma...@gmail.com>> wrote:
> > >>>>
> > >>>> Application uses large number of in-memory dimension store
> partitions
> > to
> > >>>> hold high cardinally aggregated data and also many intermediate
> > >> operators
> > >>>> keep cache data for reference look ups which are not-transient.
> > >>>>
> > >>>> Total application partitions were more than 1000 which makes lot of
> > >>>> operator to checkpoint and in term lot of frequent Hdfs write,
> rename
> > &
> > >>>> delete operations which became bottleneck.
> > >>>>
> > >>>> Application requires Exactly once semantics with idempotent
> operators
> > >> which
> > >>>> I suppose can not be achieved through Async fs writes, please
> correct
> > >> me If
> > >>>> I'm wrong here.
> > >>>>
> > >>>> Also application computes streaming aggregations of high cardinality
> > >>>> incoming data streams and reference caches are update frequently so
> > not
> > >>>> sure how much incremental checkpointing will help here.
> > >>>>
> > >>>> Despite this specific application I strongly think it would be good
> to
> > >> have
> > >>>> StorageAgent backed by distributed in-memory store as alternative in
> > >>>> platform.
> > >>>>
> > >>>> Ashish
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Wed, Dec 2, 2015 at 10:35 PM, Munagala Ramanath <
> > ram@datatorrent.com
> > >> <ma...@datatorrent.com>>
> > >>>> wrote:
> > >>>>
> > >>>>> Ashish,
> > >>>>>
> > >>>>> In the current release, the HDFS writes are asynchronous so I'm
> > >> wondering
> > >>>>> if
> > >>>>> you could elaborate on how much latency you are observing both with
> > and
> > >>>>> without
> > >>>>> checkpointing (i.e. after your changes to make operators
> stateless).
> > >>>>>
> > >>>>> Also any information on how much non-transient data is being
> > >> checkpointed
> > >>>>> in
> > >>>>> each operator would also be useful. There is an effort under way to
> > >>>>> implement
> > >>>>> incremental checkpointing which should improve things when there
> is a
> > >> lot
> > >>>>> state
> > >>>>> but very little that changes from window to window.
> > >>>>>
> > >>>>> Ram
> > >>>>>
> > >>>>>
> > >>>>> On Wed, Dec 2, 2015 at 8:51 AM, Ashish Tadose <
> > ashishtadose@gmail.com
> > >> <ma...@gmail.com>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Hi All,
> > >>>>>>
> > >>>>>> Currently Apex engine provides operator checkpointing in Hdfs (
> with
> > >> Hdfs
> > >>>>>> backed StorageAgents i.e. FSStorageAgent & AsyncFSStorageAgent )
> > >>>>>>
> > >>>>>> We have observed that for applications having large number of
> > operator
> > >>>>>> instances, hdfs checkpointing introduces latency in DAG which
> > degrades
> > >>>>>> overall application performance.
> > >>>>>> To resolve this we had to review all operators in DAG and had to
> > make
> > >> few
> > >>>>>> operators stateless.
> > >>>>>>
> > >>>>>> As operator check-pointing is critical functionality of Apex
> > streaming
> > >>>>>> platform to ensure fault tolerant behavior, platform should also
> > >> provide
> > >>>>>> alternate StorageAgents which will work seamlessly with large
> > >>>>> applications
> > >>>>>> that requires Exactly once semantics.
> > >>>>>>
> > >>>>>> HDFS read/write latency is limited and doesn't improve beyond
> > certain
> > >>>>> point
> > >>>>>> because of disk io & staging writes. Having alternate strategy to
> > this
> > >>>>>> check-pointing in fault tolerant distributed in-memory grid would
> > >> ensure
> > >>>>>> application stability and performance is not impacted.
> > >>>>>>
> > >>>>>> I have developed a in-memory storage agent which I would like to
> > >>>>> contribute
> > >>>>>> as alternate StorageAgent for checkpointing.
> > >>>>>>
> > >>>>>> Thanks,
> > >>>>>> Ashish
> > >>>>>>
> > >>>>>
> > >>>
> > >>
> > >>
> >
> >
>

Re: Operator checkpointing in distributed in-memory store

Posted by Sandesh Hegde <sa...@datatorrent.com>.
Ashish,

Two more questions for you,
What all in-memory store did you evaluate?  Are they YARN compatible?

Thank you for your contribution.

Sandesh

On Wed, Dec 2, 2015 at 10:53 AM Gaurav Gupta <ga...@datatorrent.com> wrote:

> Ashish,
>
> I have couple of questions
> 1. Are there standard APIs for distributed In-Memory stores or is this
> implementation specific to one particular tool?
> 2. Will In-Memory Store compete with DataTorrent Apps for cluster
> resources (memory/cpu)?
> 3. What is the purging policy? Who is responsible for cleaning up the
> resources for completed/failed/aborted applications? This becomes important
> when you want to launch an Application using previous Application Id
>
> Thanks
> - Gaurav
>
> > On Dec 2, 2015, at 10:07 AM, Ashish Tadose <as...@gmail.com>
> wrote:
> >
> > Thanks Gaurav,
> >
> > I have finished baseline implementations of StorageAgent and also tested
> it
> > with demo applications by explicitly specifying it in DAG configuration
> as
> > below and it works fine.
> >
> > dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
> >
> > I also had to make some changes to StramClient to pass additional
> > information such as applicationId as it doesn't passes currently.
> >
> > I am going to create JIRA task for this feature and will document design
> &
> > implementation strategy there.
> >
> > Thx,
> > Asish
> >
> >
> > On Wed, Dec 2, 2015 at 11:26 PM, Gaurav Gupta <ga...@datatorrent.com>
> > wrote:
> >
> >> Just to add you can plugin your storage agent using attribute
> >> STORAGE_AGENT (
> >>
> https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Context.OperatorContext.html#STORAGE_AGENT
> >> )
> >>
> >> Thanks
> >> - Gaurav
> >>
> >>> On Dec 2, 2015, at 9:51 AM, Gaurav Gupta <ga...@datatorrent.com>
> wrote:
> >>>
> >>> Ashish,
> >>>
> >>> You are right that Exactly once semantics can’t be achieved through
> >> Async FS write.
> >>> Did you try new StorageAgent with your Application? If yes do you have
> >> any numbers to compare?
> >>>
> >>> Thanks
> >>> - Gaurav
> >>>
> >>>> On Dec 2, 2015, at 9:33 AM, Ashish Tadose <ashishtadose@gmail.com
> >> <ma...@gmail.com>> wrote:
> >>>>
> >>>> Application uses large number of in-memory dimension store partitions
> to
> >>>> hold high cardinally aggregated data and also many intermediate
> >> operators
> >>>> keep cache data for reference look ups which are not-transient.
> >>>>
> >>>> Total application partitions were more than 1000 which makes lot of
> >>>> operator to checkpoint and in term lot of frequent Hdfs write, rename
> &
> >>>> delete operations which became bottleneck.
> >>>>
> >>>> Application requires Exactly once semantics with idempotent operators
> >> which
> >>>> I suppose can not be achieved through Async fs writes, please correct
> >> me If
> >>>> I'm wrong here.
> >>>>
> >>>> Also application computes streaming aggregations of high cardinality
> >>>> incoming data streams and reference caches are update frequently so
> not
> >>>> sure how much incremental checkpointing will help here.
> >>>>
> >>>> Despite this specific application I strongly think it would be good to
> >> have
> >>>> StorageAgent backed by distributed in-memory store as alternative in
> >>>> platform.
> >>>>
> >>>> Ashish
> >>>>
> >>>>
> >>>>
> >>>> On Wed, Dec 2, 2015 at 10:35 PM, Munagala Ramanath <
> ram@datatorrent.com
> >> <ma...@datatorrent.com>>
> >>>> wrote:
> >>>>
> >>>>> Ashish,
> >>>>>
> >>>>> In the current release, the HDFS writes are asynchronous so I'm
> >> wondering
> >>>>> if
> >>>>> you could elaborate on how much latency you are observing both with
> and
> >>>>> without
> >>>>> checkpointing (i.e. after your changes to make operators stateless).
> >>>>>
> >>>>> Also any information on how much non-transient data is being
> >> checkpointed
> >>>>> in
> >>>>> each operator would also be useful. There is an effort under way to
> >>>>> implement
> >>>>> incremental checkpointing which should improve things when there is a
> >> lot
> >>>>> state
> >>>>> but very little that changes from window to window.
> >>>>>
> >>>>> Ram
> >>>>>
> >>>>>
> >>>>> On Wed, Dec 2, 2015 at 8:51 AM, Ashish Tadose <
> ashishtadose@gmail.com
> >> <ma...@gmail.com>>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi All,
> >>>>>>
> >>>>>> Currently Apex engine provides operator checkpointing in Hdfs ( with
> >> Hdfs
> >>>>>> backed StorageAgents i.e. FSStorageAgent & AsyncFSStorageAgent )
> >>>>>>
> >>>>>> We have observed that for applications having large number of
> operator
> >>>>>> instances, hdfs checkpointing introduces latency in DAG which
> degrades
> >>>>>> overall application performance.
> >>>>>> To resolve this we had to review all operators in DAG and had to
> make
> >> few
> >>>>>> operators stateless.
> >>>>>>
> >>>>>> As operator check-pointing is critical functionality of Apex
> streaming
> >>>>>> platform to ensure fault tolerant behavior, platform should also
> >> provide
> >>>>>> alternate StorageAgents which will work seamlessly with large
> >>>>> applications
> >>>>>> that requires Exactly once semantics.
> >>>>>>
> >>>>>> HDFS read/write latency is limited and doesn't improve beyond
> certain
> >>>>> point
> >>>>>> because of disk io & staging writes. Having alternate strategy to
> this
> >>>>>> check-pointing in fault tolerant distributed in-memory grid would
> >> ensure
> >>>>>> application stability and performance is not impacted.
> >>>>>>
> >>>>>> I have developed a in-memory storage agent which I would like to
> >>>>> contribute
> >>>>>> as alternate StorageAgent for checkpointing.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Ashish
> >>>>>>
> >>>>>
> >>>
> >>
> >>
>
>

Re: Operator checkpointing in distributed in-memory store

Posted by Gaurav Gupta <ga...@datatorrent.com>.
Ashish,

I have couple of questions
1. Are there standard APIs for distributed In-Memory stores or is this implementation specific to one particular tool?
2. Will In-Memory Store compete with DataTorrent Apps for cluster resources (memory/cpu)?
3. What is the purging policy? Who is responsible for cleaning up the resources for completed/failed/aborted applications? This becomes important when you want to launch an Application using previous Application Id

Thanks
- Gaurav

> On Dec 2, 2015, at 10:07 AM, Ashish Tadose <as...@gmail.com> wrote:
> 
> Thanks Gaurav,
> 
> I have finished baseline implementations of StorageAgent and also tested it
> with demo applications by explicitly specifying it in DAG configuration as
> below and it works fine.
> 
> dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
> 
> I also had to make some changes to StramClient to pass additional
> information such as applicationId as it doesn't passes currently.
> 
> I am going to create JIRA task for this feature and will document design &
> implementation strategy there.
> 
> Thx,
> Asish
> 
> 
> On Wed, Dec 2, 2015 at 11:26 PM, Gaurav Gupta <ga...@datatorrent.com>
> wrote:
> 
>> Just to add you can plugin your storage agent using attribute
>> STORAGE_AGENT (
>> https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Context.OperatorContext.html#STORAGE_AGENT
>> )
>> 
>> Thanks
>> - Gaurav
>> 
>>> On Dec 2, 2015, at 9:51 AM, Gaurav Gupta <ga...@datatorrent.com> wrote:
>>> 
>>> Ashish,
>>> 
>>> You are right that Exactly once semantics can’t be achieved through
>> Async FS write.
>>> Did you try new StorageAgent with your Application? If yes do you have
>> any numbers to compare?
>>> 
>>> Thanks
>>> - Gaurav
>>> 
>>>> On Dec 2, 2015, at 9:33 AM, Ashish Tadose <ashishtadose@gmail.com
>> <ma...@gmail.com>> wrote:
>>>> 
>>>> Application uses large number of in-memory dimension store partitions to
>>>> hold high cardinally aggregated data and also many intermediate
>> operators
>>>> keep cache data for reference look ups which are not-transient.
>>>> 
>>>> Total application partitions were more than 1000 which makes lot of
>>>> operator to checkpoint and in term lot of frequent Hdfs write, rename &
>>>> delete operations which became bottleneck.
>>>> 
>>>> Application requires Exactly once semantics with idempotent operators
>> which
>>>> I suppose can not be achieved through Async fs writes, please correct
>> me If
>>>> I'm wrong here.
>>>> 
>>>> Also application computes streaming aggregations of high cardinality
>>>> incoming data streams and reference caches are update frequently so not
>>>> sure how much incremental checkpointing will help here.
>>>> 
>>>> Despite this specific application I strongly think it would be good to
>> have
>>>> StorageAgent backed by distributed in-memory store as alternative in
>>>> platform.
>>>> 
>>>> Ashish
>>>> 
>>>> 
>>>> 
>>>> On Wed, Dec 2, 2015 at 10:35 PM, Munagala Ramanath <ram@datatorrent.com
>> <ma...@datatorrent.com>>
>>>> wrote:
>>>> 
>>>>> Ashish,
>>>>> 
>>>>> In the current release, the HDFS writes are asynchronous so I'm
>> wondering
>>>>> if
>>>>> you could elaborate on how much latency you are observing both with and
>>>>> without
>>>>> checkpointing (i.e. after your changes to make operators stateless).
>>>>> 
>>>>> Also any information on how much non-transient data is being
>> checkpointed
>>>>> in
>>>>> each operator would also be useful. There is an effort under way to
>>>>> implement
>>>>> incremental checkpointing which should improve things when there is a
>> lot
>>>>> state
>>>>> but very little that changes from window to window.
>>>>> 
>>>>> Ram
>>>>> 
>>>>> 
>>>>> On Wed, Dec 2, 2015 at 8:51 AM, Ashish Tadose <ashishtadose@gmail.com
>> <ma...@gmail.com>>
>>>>> wrote:
>>>>> 
>>>>>> Hi All,
>>>>>> 
>>>>>> Currently Apex engine provides operator checkpointing in Hdfs ( with
>> Hdfs
>>>>>> backed StorageAgents i.e. FSStorageAgent & AsyncFSStorageAgent )
>>>>>> 
>>>>>> We have observed that for applications having large number of operator
>>>>>> instances, hdfs checkpointing introduces latency in DAG which degrades
>>>>>> overall application performance.
>>>>>> To resolve this we had to review all operators in DAG and had to make
>> few
>>>>>> operators stateless.
>>>>>> 
>>>>>> As operator check-pointing is critical functionality of Apex streaming
>>>>>> platform to ensure fault tolerant behavior, platform should also
>> provide
>>>>>> alternate StorageAgents which will work seamlessly with large
>>>>> applications
>>>>>> that requires Exactly once semantics.
>>>>>> 
>>>>>> HDFS read/write latency is limited and doesn't improve beyond certain
>>>>> point
>>>>>> because of disk io & staging writes. Having alternate strategy to this
>>>>>> check-pointing in fault tolerant distributed in-memory grid would
>> ensure
>>>>>> application stability and performance is not impacted.
>>>>>> 
>>>>>> I have developed a in-memory storage agent which I would like to
>>>>> contribute
>>>>>> as alternate StorageAgent for checkpointing.
>>>>>> 
>>>>>> Thanks,
>>>>>> Ashish
>>>>>> 
>>>>> 
>>> 
>> 
>> 


Re: Operator checkpointing in distributed in-memory store

Posted by Gaurav Gupta <ga...@datatorrent.com>.
Ashish,

Sounds good and looking forward to it..

Thanks
- Gaurav

> On Dec 2, 2015, at 10:07 AM, Ashish Tadose <as...@gmail.com> wrote:
> 
> Thanks Gaurav,
> 
> I have finished baseline implementations of StorageAgent and also tested it
> with demo applications by explicitly specifying it in DAG configuration as
> below and it works fine.
> 
> dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
> 
> I also had to make some changes to StramClient to pass additional
> information such as applicationId as it doesn't passes currently.
> 
> I am going to create JIRA task for this feature and will document design &
> implementation strategy there.
> 
> Thx,
> Asish
> 
> 
> On Wed, Dec 2, 2015 at 11:26 PM, Gaurav Gupta <gaurav@datatorrent.com <ma...@datatorrent.com>>
> wrote:
> 
>> Just to add you can plugin your storage agent using attribute
>> STORAGE_AGENT (
>> https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Context.OperatorContext.html#STORAGE_AGENT
>> )
>> 
>> Thanks
>> - Gaurav
>> 
>>> On Dec 2, 2015, at 9:51 AM, Gaurav Gupta <ga...@datatorrent.com> wrote:
>>> 
>>> Ashish,
>>> 
>>> You are right that Exactly once semantics can’t be achieved through
>> Async FS write.
>>> Did you try new StorageAgent with your Application? If yes do you have
>> any numbers to compare?
>>> 
>>> Thanks
>>> - Gaurav
>>> 
>>>> On Dec 2, 2015, at 9:33 AM, Ashish Tadose <ashishtadose@gmail.com
>> <mailto:ashishtadose@gmail.com <ma...@gmail.com>>> wrote:
>>>> 
>>>> Application uses large number of in-memory dimension store partitions to
>>>> hold high cardinally aggregated data and also many intermediate
>> operators
>>>> keep cache data for reference look ups which are not-transient.
>>>> 
>>>> Total application partitions were more than 1000 which makes lot of
>>>> operator to checkpoint and in term lot of frequent Hdfs write, rename &
>>>> delete operations which became bottleneck.
>>>> 
>>>> Application requires Exactly once semantics with idempotent operators
>> which
>>>> I suppose can not be achieved through Async fs writes, please correct
>> me If
>>>> I'm wrong here.
>>>> 
>>>> Also application computes streaming aggregations of high cardinality
>>>> incoming data streams and reference caches are update frequently so not
>>>> sure how much incremental checkpointing will help here.
>>>> 
>>>> Despite this specific application I strongly think it would be good to
>> have
>>>> StorageAgent backed by distributed in-memory store as alternative in
>>>> platform.
>>>> 
>>>> Ashish
>>>> 
>>>> 
>>>> 
>>>> On Wed, Dec 2, 2015 at 10:35 PM, Munagala Ramanath <ram@datatorrent.com <ma...@datatorrent.com>
>> <mailto:ram@datatorrent.com <ma...@datatorrent.com>>>
>>>> wrote:
>>>> 
>>>>> Ashish,
>>>>> 
>>>>> In the current release, the HDFS writes are asynchronous so I'm
>> wondering
>>>>> if
>>>>> you could elaborate on how much latency you are observing both with and
>>>>> without
>>>>> checkpointing (i.e. after your changes to make operators stateless).
>>>>> 
>>>>> Also any information on how much non-transient data is being
>> checkpointed
>>>>> in
>>>>> each operator would also be useful. There is an effort under way to
>>>>> implement
>>>>> incremental checkpointing which should improve things when there is a
>> lot
>>>>> state
>>>>> but very little that changes from window to window.
>>>>> 
>>>>> Ram
>>>>> 
>>>>> 
>>>>> On Wed, Dec 2, 2015 at 8:51 AM, Ashish Tadose <ashishtadose@gmail.com
>> <mailto:ashishtadose@gmail.com <ma...@gmail.com>>>
>>>>> wrote:
>>>>> 
>>>>>> Hi All,
>>>>>> 
>>>>>> Currently Apex engine provides operator checkpointing in Hdfs ( with
>> Hdfs
>>>>>> backed StorageAgents i.e. FSStorageAgent & AsyncFSStorageAgent )
>>>>>> 
>>>>>> We have observed that for applications having large number of operator
>>>>>> instances, hdfs checkpointing introduces latency in DAG which degrades
>>>>>> overall application performance.
>>>>>> To resolve this we had to review all operators in DAG and had to make
>> few
>>>>>> operators stateless.
>>>>>> 
>>>>>> As operator check-pointing is critical functionality of Apex streaming
>>>>>> platform to ensure fault tolerant behavior, platform should also
>> provide
>>>>>> alternate StorageAgents which will work seamlessly with large
>>>>> applications
>>>>>> that requires Exactly once semantics.
>>>>>> 
>>>>>> HDFS read/write latency is limited and doesn't improve beyond certain
>>>>> point
>>>>>> because of disk io & staging writes. Having alternate strategy to this
>>>>>> check-pointing in fault tolerant distributed in-memory grid would
>> ensure
>>>>>> application stability and performance is not impacted.
>>>>>> 
>>>>>> I have developed a in-memory storage agent which I would like to
>>>>> contribute
>>>>>> as alternate StorageAgent for checkpointing.
>>>>>> 
>>>>>> Thanks,
>>>>>> Ashish


Re: Operator checkpointing in distributed in-memory store

Posted by Ashish Tadose <as...@gmail.com>.
Thanks Gaurav,

I have finished baseline implementations of StorageAgent and also tested it
with demo applications by explicitly specifying it in DAG configuration as
below and it works fine.

dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);

I also had to make some changes to StramClient to pass additional
information such as applicationId as it doesn't passes currently.

I am going to create JIRA task for this feature and will document design &
implementation strategy there.

Thx,
Asish


On Wed, Dec 2, 2015 at 11:26 PM, Gaurav Gupta <ga...@datatorrent.com>
wrote:

> Just to add you can plugin your storage agent using attribute
> STORAGE_AGENT (
> https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Context.OperatorContext.html#STORAGE_AGENT
> )
>
> Thanks
> - Gaurav
>
> > On Dec 2, 2015, at 9:51 AM, Gaurav Gupta <ga...@datatorrent.com> wrote:
> >
> > Ashish,
> >
> > You are right that Exactly once semantics can’t be achieved through
> Async FS write.
> > Did you try new StorageAgent with your Application? If yes do you have
> any numbers to compare?
> >
> > Thanks
> > - Gaurav
> >
> >> On Dec 2, 2015, at 9:33 AM, Ashish Tadose <ashishtadose@gmail.com
> <ma...@gmail.com>> wrote:
> >>
> >> Application uses large number of in-memory dimension store partitions to
> >> hold high cardinally aggregated data and also many intermediate
> operators
> >> keep cache data for reference look ups which are not-transient.
> >>
> >> Total application partitions were more than 1000 which makes lot of
> >> operator to checkpoint and in term lot of frequent Hdfs write, rename &
> >> delete operations which became bottleneck.
> >>
> >> Application requires Exactly once semantics with idempotent operators
> which
> >> I suppose can not be achieved through Async fs writes, please correct
> me If
> >> I'm wrong here.
> >>
> >> Also application computes streaming aggregations of high cardinality
> >> incoming data streams and reference caches are update frequently so not
> >> sure how much incremental checkpointing will help here.
> >>
> >> Despite this specific application I strongly think it would be good to
> have
> >> StorageAgent backed by distributed in-memory store as alternative in
> >> platform.
> >>
> >> Ashish
> >>
> >>
> >>
> >> On Wed, Dec 2, 2015 at 10:35 PM, Munagala Ramanath <ram@datatorrent.com
> <ma...@datatorrent.com>>
> >> wrote:
> >>
> >>> Ashish,
> >>>
> >>> In the current release, the HDFS writes are asynchronous so I'm
> wondering
> >>> if
> >>> you could elaborate on how much latency you are observing both with and
> >>> without
> >>> checkpointing (i.e. after your changes to make operators stateless).
> >>>
> >>> Also any information on how much non-transient data is being
> checkpointed
> >>> in
> >>> each operator would also be useful. There is an effort under way to
> >>> implement
> >>> incremental checkpointing which should improve things when there is a
> lot
> >>> state
> >>> but very little that changes from window to window.
> >>>
> >>> Ram
> >>>
> >>>
> >>> On Wed, Dec 2, 2015 at 8:51 AM, Ashish Tadose <ashishtadose@gmail.com
> <ma...@gmail.com>>
> >>> wrote:
> >>>
> >>>> Hi All,
> >>>>
> >>>> Currently Apex engine provides operator checkpointing in Hdfs ( with
> Hdfs
> >>>> backed StorageAgents i.e. FSStorageAgent & AsyncFSStorageAgent )
> >>>>
> >>>> We have observed that for applications having large number of operator
> >>>> instances, hdfs checkpointing introduces latency in DAG which degrades
> >>>> overall application performance.
> >>>> To resolve this we had to review all operators in DAG and had to make
> few
> >>>> operators stateless.
> >>>>
> >>>> As operator check-pointing is critical functionality of Apex streaming
> >>>> platform to ensure fault tolerant behavior, platform should also
> provide
> >>>> alternate StorageAgents which will work seamlessly with large
> >>> applications
> >>>> that requires Exactly once semantics.
> >>>>
> >>>> HDFS read/write latency is limited and doesn't improve beyond certain
> >>> point
> >>>> because of disk io & staging writes. Having alternate strategy to this
> >>>> check-pointing in fault tolerant distributed in-memory grid would
> ensure
> >>>> application stability and performance is not impacted.
> >>>>
> >>>> I have developed a in-memory storage agent which I would like to
> >>> contribute
> >>>> as alternate StorageAgent for checkpointing.
> >>>>
> >>>> Thanks,
> >>>> Ashish
> >>>>
> >>>
> >
>
>

Re: Operator checkpointing in distributed in-memory store

Posted by Gaurav Gupta <ga...@datatorrent.com>.
Just to add you can plugin your storage agent using attribute STORAGE_AGENT (https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Context.OperatorContext.html#STORAGE_AGENT)

Thanks
- Gaurav

> On Dec 2, 2015, at 9:51 AM, Gaurav Gupta <ga...@datatorrent.com> wrote:
> 
> Ashish,
> 
> You are right that Exactly once semantics can’t be achieved through Async FS write. 
> Did you try new StorageAgent with your Application? If yes do you have any numbers to compare?
> 
> Thanks
> - Gaurav
> 
>> On Dec 2, 2015, at 9:33 AM, Ashish Tadose <ashishtadose@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Application uses large number of in-memory dimension store partitions to
>> hold high cardinally aggregated data and also many intermediate operators
>> keep cache data for reference look ups which are not-transient.
>> 
>> Total application partitions were more than 1000 which makes lot of
>> operator to checkpoint and in term lot of frequent Hdfs write, rename &
>> delete operations which became bottleneck.
>> 
>> Application requires Exactly once semantics with idempotent operators which
>> I suppose can not be achieved through Async fs writes, please correct me If
>> I'm wrong here.
>> 
>> Also application computes streaming aggregations of high cardinality
>> incoming data streams and reference caches are update frequently so not
>> sure how much incremental checkpointing will help here.
>> 
>> Despite this specific application I strongly think it would be good to have
>> StorageAgent backed by distributed in-memory store as alternative in
>> platform.
>> 
>> Ashish
>> 
>> 
>> 
>> On Wed, Dec 2, 2015 at 10:35 PM, Munagala Ramanath <ram@datatorrent.com <ma...@datatorrent.com>>
>> wrote:
>> 
>>> Ashish,
>>> 
>>> In the current release, the HDFS writes are asynchronous so I'm wondering
>>> if
>>> you could elaborate on how much latency you are observing both with and
>>> without
>>> checkpointing (i.e. after your changes to make operators stateless).
>>> 
>>> Also any information on how much non-transient data is being checkpointed
>>> in
>>> each operator would also be useful. There is an effort under way to
>>> implement
>>> incremental checkpointing which should improve things when there is a lot
>>> state
>>> but very little that changes from window to window.
>>> 
>>> Ram
>>> 
>>> 
>>> On Wed, Dec 2, 2015 at 8:51 AM, Ashish Tadose <ashishtadose@gmail.com <ma...@gmail.com>>
>>> wrote:
>>> 
>>>> Hi All,
>>>> 
>>>> Currently Apex engine provides operator checkpointing in Hdfs ( with Hdfs
>>>> backed StorageAgents i.e. FSStorageAgent & AsyncFSStorageAgent )
>>>> 
>>>> We have observed that for applications having large number of operator
>>>> instances, hdfs checkpointing introduces latency in DAG which degrades
>>>> overall application performance.
>>>> To resolve this we had to review all operators in DAG and had to make few
>>>> operators stateless.
>>>> 
>>>> As operator check-pointing is critical functionality of Apex streaming
>>>> platform to ensure fault tolerant behavior, platform should also provide
>>>> alternate StorageAgents which will work seamlessly with large
>>> applications
>>>> that requires Exactly once semantics.
>>>> 
>>>> HDFS read/write latency is limited and doesn't improve beyond certain
>>> point
>>>> because of disk io & staging writes. Having alternate strategy to this
>>>> check-pointing in fault tolerant distributed in-memory grid would ensure
>>>> application stability and performance is not impacted.
>>>> 
>>>> I have developed a in-memory storage agent which I would like to
>>> contribute
>>>> as alternate StorageAgent for checkpointing.
>>>> 
>>>> Thanks,
>>>> Ashish
>>>> 
>>> 
> 


Re: Operator checkpointing in distributed in-memory store

Posted by Gaurav Gupta <ga...@datatorrent.com>.
Ashish,

You are right that Exactly once semantics can’t be achieved through Async FS write. 
Did you try new StorageAgent with your Application? If yes do you have any numbers to compare?

Thanks
- Gaurav

> On Dec 2, 2015, at 9:33 AM, Ashish Tadose <as...@gmail.com> wrote:
> 
> Application uses large number of in-memory dimension store partitions to
> hold high cardinally aggregated data and also many intermediate operators
> keep cache data for reference look ups which are not-transient.
> 
> Total application partitions were more than 1000 which makes lot of
> operator to checkpoint and in term lot of frequent Hdfs write, rename &
> delete operations which became bottleneck.
> 
> Application requires Exactly once semantics with idempotent operators which
> I suppose can not be achieved through Async fs writes, please correct me If
> I'm wrong here.
> 
> Also application computes streaming aggregations of high cardinality
> incoming data streams and reference caches are update frequently so not
> sure how much incremental checkpointing will help here.
> 
> Despite this specific application I strongly think it would be good to have
> StorageAgent backed by distributed in-memory store as alternative in
> platform.
> 
> Ashish
> 
> 
> 
> On Wed, Dec 2, 2015 at 10:35 PM, Munagala Ramanath <ra...@datatorrent.com>
> wrote:
> 
>> Ashish,
>> 
>> In the current release, the HDFS writes are asynchronous so I'm wondering
>> if
>> you could elaborate on how much latency you are observing both with and
>> without
>> checkpointing (i.e. after your changes to make operators stateless).
>> 
>> Also any information on how much non-transient data is being checkpointed
>> in
>> each operator would also be useful. There is an effort under way to
>> implement
>> incremental checkpointing which should improve things when there is a lot
>> state
>> but very little that changes from window to window.
>> 
>> Ram
>> 
>> 
>> On Wed, Dec 2, 2015 at 8:51 AM, Ashish Tadose <as...@gmail.com>
>> wrote:
>> 
>>> Hi All,
>>> 
>>> Currently Apex engine provides operator checkpointing in Hdfs ( with Hdfs
>>> backed StorageAgents i.e. FSStorageAgent & AsyncFSStorageAgent )
>>> 
>>> We have observed that for applications having large number of operator
>>> instances, hdfs checkpointing introduces latency in DAG which degrades
>>> overall application performance.
>>> To resolve this we had to review all operators in DAG and had to make few
>>> operators stateless.
>>> 
>>> As operator check-pointing is critical functionality of Apex streaming
>>> platform to ensure fault tolerant behavior, platform should also provide
>>> alternate StorageAgents which will work seamlessly with large
>> applications
>>> that requires Exactly once semantics.
>>> 
>>> HDFS read/write latency is limited and doesn't improve beyond certain
>> point
>>> because of disk io & staging writes. Having alternate strategy to this
>>> check-pointing in fault tolerant distributed in-memory grid would ensure
>>> application stability and performance is not impacted.
>>> 
>>> I have developed a in-memory storage agent which I would like to
>> contribute
>>> as alternate StorageAgent for checkpointing.
>>> 
>>> Thanks,
>>> Ashish
>>> 
>> 


Re: Operator checkpointing in distributed in-memory store

Posted by Ashish Tadose <as...@gmail.com>.
Application uses large number of in-memory dimension store partitions to
hold high cardinally aggregated data and also many intermediate operators
keep cache data for reference look ups which are not-transient.

Total application partitions were more than 1000 which makes lot of
operator to checkpoint and in term lot of frequent Hdfs write, rename &
delete operations which became bottleneck.

Application requires Exactly once semantics with idempotent operators which
I suppose can not be achieved through Async fs writes, please correct me If
I'm wrong here.

Also application computes streaming aggregations of high cardinality
incoming data streams and reference caches are update frequently so not
sure how much incremental checkpointing will help here.

Despite this specific application I strongly think it would be good to have
StorageAgent backed by distributed in-memory store as alternative in
platform.

Ashish



On Wed, Dec 2, 2015 at 10:35 PM, Munagala Ramanath <ra...@datatorrent.com>
wrote:

> Ashish,
>
> In the current release, the HDFS writes are asynchronous so I'm wondering
> if
> you could elaborate on how much latency you are observing both with and
> without
> checkpointing (i.e. after your changes to make operators stateless).
>
> Also any information on how much non-transient data is being checkpointed
> in
> each operator would also be useful. There is an effort under way to
> implement
> incremental checkpointing which should improve things when there is a lot
> state
> but very little that changes from window to window.
>
> Ram
>
>
> On Wed, Dec 2, 2015 at 8:51 AM, Ashish Tadose <as...@gmail.com>
> wrote:
>
> > Hi All,
> >
> > Currently Apex engine provides operator checkpointing in Hdfs ( with Hdfs
> > backed StorageAgents i.e. FSStorageAgent & AsyncFSStorageAgent )
> >
> > We have observed that for applications having large number of operator
> > instances, hdfs checkpointing introduces latency in DAG which degrades
> > overall application performance.
> > To resolve this we had to review all operators in DAG and had to make few
> > operators stateless.
> >
> > As operator check-pointing is critical functionality of Apex streaming
> > platform to ensure fault tolerant behavior, platform should also provide
> > alternate StorageAgents which will work seamlessly with large
> applications
> > that requires Exactly once semantics.
> >
> > HDFS read/write latency is limited and doesn't improve beyond certain
> point
> > because of disk io & staging writes. Having alternate strategy to this
> > check-pointing in fault tolerant distributed in-memory grid would ensure
> > application stability and performance is not impacted.
> >
> > I have developed a in-memory storage agent which I would like to
> contribute
> > as alternate StorageAgent for checkpointing.
> >
> > Thanks,
> > Ashish
> >
>

Re: Operator checkpointing in distributed in-memory store

Posted by Munagala Ramanath <ra...@datatorrent.com>.
Ashish,

In the current release, the HDFS writes are asynchronous so I'm wondering if
you could elaborate on how much latency you are observing both with and
without
checkpointing (i.e. after your changes to make operators stateless).

Also any information on how much non-transient data is being checkpointed in
each operator would also be useful. There is an effort under way to
implement
incremental checkpointing which should improve things when there is a lot
state
but very little that changes from window to window.

Ram


On Wed, Dec 2, 2015 at 8:51 AM, Ashish Tadose <as...@gmail.com>
wrote:

> Hi All,
>
> Currently Apex engine provides operator checkpointing in Hdfs ( with Hdfs
> backed StorageAgents i.e. FSStorageAgent & AsyncFSStorageAgent )
>
> We have observed that for applications having large number of operator
> instances, hdfs checkpointing introduces latency in DAG which degrades
> overall application performance.
> To resolve this we had to review all operators in DAG and had to make few
> operators stateless.
>
> As operator check-pointing is critical functionality of Apex streaming
> platform to ensure fault tolerant behavior, platform should also provide
> alternate StorageAgents which will work seamlessly with large applications
> that requires Exactly once semantics.
>
> HDFS read/write latency is limited and doesn't improve beyond certain point
> because of disk io & staging writes. Having alternate strategy to this
> check-pointing in fault tolerant distributed in-memory grid would ensure
> application stability and performance is not impacted.
>
> I have developed a in-memory storage agent which I would like to contribute
> as alternate StorageAgent for checkpointing.
>
> Thanks,
> Ashish
>

Re: Operator checkpointing in distributed in-memory store

Posted by Amol Kekre <am...@datatorrent.com>.
Ashish,
This is great. Do send the link to your code.

Thks,
Amol


On Wed, Dec 2, 2015 at 8:51 AM, Ashish Tadose <as...@gmail.com>
wrote:

> Hi All,
>
> Currently Apex engine provides operator checkpointing in Hdfs ( with Hdfs
> backed StorageAgents i.e. FSStorageAgent & AsyncFSStorageAgent )
>
> We have observed that for applications having large number of operator
> instances, hdfs checkpointing introduces latency in DAG which degrades
> overall application performance.
> To resolve this we had to review all operators in DAG and had to make few
> operators stateless.
>
> As operator check-pointing is critical functionality of Apex streaming
> platform to ensure fault tolerant behavior, platform should also provide
> alternate StorageAgents which will work seamlessly with large applications
> that requires Exactly once semantics.
>
> HDFS read/write latency is limited and doesn't improve beyond certain point
> because of disk io & staging writes. Having alternate strategy to this
> check-pointing in fault tolerant distributed in-memory grid would ensure
> application stability and performance is not impacted.
>
> I have developed a in-memory storage agent which I would like to contribute
> as alternate StorageAgent for checkpointing.
>
> Thanks,
> Ashish
>